From d91b0d803d081a534a9317eb166f275ef471283c Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 9 Jul 2020 13:01:07 -0400 Subject: [PATCH 1/2] [Elastic Agent] Fix saving of agent configuration on Windows to have proper ACLs (#19793) * Fix permissions for windows to set proper ACLs * Generate NOTICE.txt. --- NOTICE.txt | 19 +++++++++ go.mod | 1 + go.sum | 3 ++ .../pkg/agent/storage/storage.go | 16 ++++++++ .../pkg/agent/storage/storage_test.go | 40 ++++++++----------- 5 files changed, 55 insertions(+), 24 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index eae38ab5fda..1ed31b1ddb0 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -11014,6 +11014,25 @@ Exhibit B - "Incompatible With Secondary Licenses" Notice +-------------------------------------------------------------------------------- +Dependency : github.com/hectane/go-acl +Version: v0.0.0-20190604041725-da78bae5fc95 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/hectane/go-acl@v0.0.0-20190604041725-da78bae5fc95/LICENSE.txt: + +The MIT License (MIT) + +Copyright (c) 2015 Nathan Osman + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + -------------------------------------------------------------------------------- Dependency : github.com/elastic/dhcp Version: v0.0.0-20200227161230-57ec251c7eb3 diff --git a/go.mod b/go.mod index 62f0ceb5c33..5f7655f143c 100644 --- a/go.mod +++ b/go.mod @@ -100,6 +100,7 @@ require ( github.com/hashicorp/go-multierror v1.1.0 github.com/hashicorp/go-retryablehttp v0.6.6 github.com/hashicorp/golang-lru v0.5.2-0.20190520140433-59383c442f7d // indirect + github.com/hectane/go-acl v0.0.0-20190604041725-da78bae5fc95 github.com/insomniacslk/dhcp v0.0.0-20180716145214-633285ba52b2 github.com/jmoiron/sqlx v1.2.1-0.20190826204134-d7d95172beb5 github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 diff --git a/go.sum b/go.sum index 84cee99d57d..fd1e638b44b 100644 --- a/go.sum +++ b/go.sum @@ -427,6 +427,8 @@ github.com/haya14busa/go-actions-toolkit v0.0.0-20200105081403-ca0307860f01 h1:H github.com/haya14busa/go-actions-toolkit v0.0.0-20200105081403-ca0307860f01/go.mod h1:1DWDZmeYf0LX30zscWb7K9rUMeirNeBMd5Dum+seUhc= github.com/haya14busa/go-checkstyle v0.0.0-20170303121022-5e9d09f51fa1/go.mod h1:RsN5RGgVYeXpcXNtWyztD5VIe7VNSEqpJvF2iEH7QvI= github.com/haya14busa/secretbox v0.0.0-20180525171038-07c7ecf409f5/go.mod h1:FGO/dXIFZnan7KvvUSFk1hYMnoVNzB6NTMPrmke8SSI= +github.com/hectane/go-acl v0.0.0-20190604041725-da78bae5fc95 h1:S4qyfL2sEm5Budr4KVMyEniCy+PbS55651I/a+Kn/NQ= +github.com/hectane/go-acl v0.0.0-20190604041725-da78bae5fc95/go.mod h1:QiyDdbZLaJ/mZP4Zwc9g2QsfaEA4o7XvvgZegSci5/E= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -818,6 +820,7 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190514135907-3a4b5fb9f71f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190529164535-6a60838ec259/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/x-pack/elastic-agent/pkg/agent/storage/storage.go b/x-pack/elastic-agent/pkg/agent/storage/storage.go index 0c8ff360b87..b37c6e06ab3 100644 --- a/x-pack/elastic-agent/pkg/agent/storage/storage.go +++ b/x-pack/elastic-agent/pkg/agent/storage/storage.go @@ -12,6 +12,8 @@ import ( "os" "time" + "github.com/hectane/go-acl" + "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" ) @@ -129,6 +131,13 @@ func (r *ReplaceOnSuccessStore) Save(in io.Reader) error { } } + if err := acl.Chmod(r.target, perms); err != nil { + return errors.New(err, + fmt.Sprintf("could not set permissions target file %s", r.target), + errors.TypeFilesystem, + errors.M(errors.MetaKeyPath, r.target)) + } + return nil } @@ -178,6 +187,13 @@ func (d *DiskStore) Save(in io.Reader) error { errors.M(errors.MetaKeyPath, d.target)) } + if err := acl.Chmod(d.target, perms); err != nil { + return errors.New(err, + fmt.Sprintf("could not set permissions target file %s", d.target), + errors.TypeFilesystem, + errors.M(errors.MetaKeyPath, d.target)) + } + return nil } diff --git a/x-pack/elastic-agent/pkg/agent/storage/storage_test.go b/x-pack/elastic-agent/pkg/agent/storage/storage_test.go index 3d53f43cdb6..52cd1960b22 100644 --- a/x-pack/elastic-agent/pkg/agent/storage/storage_test.go +++ b/x-pack/elastic-agent/pkg/agent/storage/storage_test.go @@ -11,6 +11,7 @@ import ( "io/ioutil" "os" "path/filepath" + "runtime" "testing" "time" @@ -49,11 +50,7 @@ func TestReplaceOrRollbackStore(t *testing.T) { require.True(t, bytes.Equal(writtenContent, replaceWith)) requireFilesCount(t, dir, 2) - - info, err := os.Stat(target) - require.NoError(t, err) - - require.Equal(t, perms, info.Mode()) + checkPerms(t, target, perms) }) t.Run("when save is not successful", func(t *testing.T) { @@ -103,10 +100,6 @@ func TestReplaceOrRollbackStore(t *testing.T) { require.True(t, bytes.Equal(writtenContent, replaceWith)) requireFilesCount(t, dir, 1) - info, err := os.Stat(target) - require.NoError(t, err) - - require.Equal(t, perms, info.Mode()) }) t.Run("when target file do not exist", func(t *testing.T) { @@ -135,11 +128,7 @@ func TestDiskStore(t *testing.T) { require.NoError(t, err) require.Equal(t, msg, content) - - info, err := os.Stat(target) - require.NoError(t, err) - - require.Equal(t, perms, info.Mode()) + checkPerms(t, target, perms) }) t.Run("when the target do no exist", func(t *testing.T) { @@ -158,11 +147,7 @@ func TestDiskStore(t *testing.T) { require.NoError(t, err) require.Equal(t, msg, content) - - info, err := os.Stat(target) - require.NoError(t, err) - - require.Equal(t, perms, info.Mode()) + checkPerms(t, target, perms) }) t.Run("return an io.ReadCloser to the target file", func(t *testing.T) { @@ -178,11 +163,7 @@ func TestDiskStore(t *testing.T) { content, err := ioutil.ReadAll(r) require.NoError(t, err) require.Equal(t, msg, content) - - info, err := os.Stat(target) - require.NoError(t, err) - - require.Equal(t, perms, info.Mode()) + checkPerms(t, target, perms) }) } @@ -210,3 +191,14 @@ func requireFilesCount(t *testing.T, dir string, l int) { require.NoError(t, err) require.Equal(t, l, len(files)) } + +func checkPerms(t *testing.T, target string, expected os.FileMode) { + t.Helper() + if runtime.GOOS == "windows" { + // Windows API validation of ACL is skipped, as its very complicated. + return + } + info, err := os.Stat(target) + require.NoError(t, err) + require.Equal(t, expected, info.Mode()) +} From d53cd128ef0705017218c5b421196c9e7ce60b1e Mon Sep 17 00:00:00 2001 From: Alex K <8418476+fearful-symmetry@users.noreply.github.com> Date: Thu, 9 Jul 2020 10:11:02 -0700 Subject: [PATCH 2/2] Add `docker logs` support to the Elastic Log Driver (#19531) * init commit of docker logs support * remove vendor * fix tests * mage fmt * code cleanup * change logging directories around, add code to remove containers * remove error message on EOF * fix config, add docs * more fixes, migrate to writing logs to docker's own location * add changelog entry * docs cleanup --- CHANGELOG.next.asciidoc | 3 + x-pack/dockerlogbeat/config.json | 24 +++ .../dockerlogbeat/docs/configuration.asciidoc | 73 ++++++++- x-pack/dockerlogbeat/handlers.go | 50 +++++- x-pack/dockerlogbeat/main.go | 17 ++- .../pipelinemanager/clientLogReader.go | 67 ++++++--- .../pipelinemanager/clientLogReader_test.go | 17 ++- .../dockerlogbeat/pipelinemanager/config.go | 2 +- .../pipelinemanager/pipelineManager.go | 142 +++++++++++++++++- x-pack/dockerlogbeat/pipereader/reader.go | 2 +- x-pack/dockerlogbeat/readme.md | 10 ++ 11 files changed, 371 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9c18e744ccf..76ebb7f0002 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -560,6 +560,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add registry and code signature information and ECS categorization fields for sysmon module {pull}18058[18058] - Add new winlogbeat security dashboard {pull}18775[18775] +*Elastic Log Driver* +- Add support for `docker logs` command {pull}19531[19531] + ==== Deprecated *Affecting all Beats* diff --git a/x-pack/dockerlogbeat/config.json b/x-pack/dockerlogbeat/config.json index d3072a841bc..5d7d1d3c745 100644 --- a/x-pack/dockerlogbeat/config.json +++ b/x-pack/dockerlogbeat/config.json @@ -13,7 +13,31 @@ ], "socket": "beatSocket.sock" }, + "mounts": [ + { + "name": "LOG_DIR", + "description": "Mount for local log cache", + "destination": "/var/log/docker", + "source": "/var/lib/docker", + "type": "none", + "options": [ + "rw", + "rbind" + ], + "Settable": [ + "source" + ] + } + ], "env": [ + { + "description": "Destroy logs after a container has stopped", + "name": "DESTROY_LOGS_ON_STOP", + "value": "false", + "Settable": [ + "value" + ] + }, { "description": "debug level", "name": "LOG_DRIVER_LEVEL", diff --git a/x-pack/dockerlogbeat/docs/configuration.asciidoc b/x-pack/dockerlogbeat/docs/configuration.asciidoc index 708eb941a91..f1bf6821489 100644 --- a/x-pack/dockerlogbeat/docs/configuration.asciidoc +++ b/x-pack/dockerlogbeat/docs/configuration.asciidoc @@ -49,10 +49,6 @@ format is `"username:password"`. [[es-output-options]] === {es} output options -// TODO: Add the following settings. Syntax is a little different so we might -// need to add deameon examples that show how to specify these settings: -// `output.elasticsearch.indices -// `output.elasticsearch.pipelines` [options="header"] |===== @@ -117,3 +113,72 @@ for more information about the environment variables. |===== + + +[float] +[[local-log-opts]] +=== Configuring the local log +This plugin fully supports `docker logs`, and it maintains a local copy of logs that can be read without a connection to Elasticsearch. The plugin mounts the `/var/lib/docker` directory on the host to write logs to `/var/log/containers` on the host. If you want to change the log location on the host, you must change the mount inside the plugin: + +1. Disable the plugin: ++ +["source","sh",subs="attributes"] +---- +docker plugin disable elastic/{log-driver-alias}:{version} +---- + +2. Set the bindmount directory: ++ +["source","sh",subs="attributes"] +---- +docker plugin set elastic/{log-driver-alias}:{version} LOG_DIR.source=NEW_LOG_LOCATION +---- ++ + +3. Enable the plugin: ++ +["source","sh",subs="attributes"] +---- +docker plugin enable elastic/{log-driver-alias}:{version} +---- + + +The local log also supports the `max-file`, `max-size` and `compress` options that are https://docs.docker.com/config/containers/logging/json-file/#options[a part of the Docker default file logger]. For example: + +["source","sh",subs="attributes"] +---- +docker run --log-driver=elastic/{log-driver-alias}:{version} \ + --log-opt endpoint="myhost:9200" \ + --log-opt user="myusername" \ + --log-opt password="mypassword" \ + --log-opt max-file=10 \ + --log-opt max-size=5M \ + --log-opt compress=true \ + -it debian:jessie /bin/bash +---- + + +In situations where logs can't be easily managed, for example, you can also configure the plugin to remove log files when a container is stopped. This will prevent you from reading logs on a stopped container, but it will rotate logs without user intervention. To enable removal of logs for stopped containers, you must change the `DESTROY_LOGS_ON_STOP` environment variable: + +1. Disable the plugin: ++ +["source","sh",subs="attributes"] +---- +docker plugin disable elastic/{log-driver-alias}:{version} +---- + +2. Enable log removal: ++ +["source","sh",subs="attributes"] +---- +docker plugin set elastic/{log-driver-alias}:{version} DESTROY_LOGS_ON_STOP=true +---- ++ + +3. Enable the plugin: ++ +["source","sh",subs="attributes"] +---- +docker plugin enable elastic/{log-driver-alias}:{version} +---- + diff --git a/x-pack/dockerlogbeat/handlers.go b/x-pack/dockerlogbeat/handlers.go index 604c029e601..8b3a771a741 100644 --- a/x-pack/dockerlogbeat/handlers.go +++ b/x-pack/dockerlogbeat/handlers.go @@ -6,12 +6,14 @@ package main import ( "encoding/json" + "io" "net/http" "github.com/docker/docker/daemon/logger" "github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemanager" + "github.com/docker/docker/pkg/ioutils" "github.com/pkg/errors" ) @@ -26,6 +28,26 @@ type StopLoggingRequest struct { File string } +// capabilitiesResponse represents the response to a capabilities request +type capabilitiesResponse struct { + Err string + Cap logger.Capability +} + +// logsRequest represents the request object we get from a `docker logs` call +type logsRequest struct { + Info logger.Info + Config logger.ReadConfig +} + +func reportCaps() func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + json.NewEncoder(w).Encode(&capabilitiesResponse{ + Cap: logger.Capability{ReadLogs: true}, + }) + } +} + // This gets called when a container starts that requests the log driver func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { @@ -36,7 +58,7 @@ func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respon return } - pm.Logger.Infof("Got start request object from container %#v\n", startReq.Info.ContainerName) + pm.Logger.Debugf("Got start request object from container %#v\n", startReq.Info.ContainerName) pm.Logger.Debugf("Got a container with the following labels: %#v\n", startReq.Info.ContainerLabels) pm.Logger.Debugf("Got a container with the following log opts: %#v\n", startReq.Info.Config) @@ -67,7 +89,7 @@ func stopLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respons http.Error(w, errors.Wrap(err, "error decoding json request").Error(), http.StatusBadRequest) return } - pm.Logger.Infof("Got stop request object %#v\n", stopReq) + pm.Logger.Debugf("Got stop request object %#v\n", stopReq) // Run the stop async, since nothing 'depends' on it, // and we can break people's docker automation if this times out. go func() { @@ -81,6 +103,30 @@ func stopLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respons } // end func } +func readLogHandler(pm *pipelinemanager.PipelineManager) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + var logReq logsRequest + err := json.NewDecoder(r.Body).Decode(&logReq) + if err != nil { + http.Error(w, errors.Wrap(err, "error decoding json request").Error(), http.StatusBadRequest) + return + } + + pm.Logger.Debugf("Got logging request for container %s\n", logReq.Info.ContainerName) + stream, err := pm.CreateReaderForContainer(logReq.Info, logReq.Config) + if err != nil { + http.Error(w, errors.Wrap(err, "error creating log reader").Error(), http.StatusBadRequest) + return + } + defer stream.Close() + w.Header().Set("Content-Type", "application/x-json-stream") + wf := ioutils.NewWriteFlusher(w) + defer wf.Close() + io.Copy(wf, stream) + + } //end func +} + // For the start/stop handler, the daemon expects back an error object. If the body is empty, then all is well. func respondOK(w http.ResponseWriter) { res := struct { diff --git a/x-pack/dockerlogbeat/main.go b/x-pack/dockerlogbeat/main.go index 360fd265caa..e3a5b8d0310 100644 --- a/x-pack/dockerlogbeat/main.go +++ b/x-pack/dockerlogbeat/main.go @@ -7,6 +7,7 @@ package main import ( "fmt" "os" + "strconv" "github.com/docker/go-plugins-helpers/sdk" @@ -41,6 +42,14 @@ func genNewMonitoringConfig() (*common.Config, error) { return cfg, nil } +func setDestroyLogsOnStop() (bool, error) { + setting, ok := os.LookupEnv("DESTROY_LOGS_ON_STOP") + if !ok { + return false, nil + } + return strconv.ParseBool(setting) +} + func fatal(format string, vs ...interface{}) { fmt.Fprintf(os.Stderr, format, vs...) os.Exit(1) @@ -60,12 +69,18 @@ func main() { fatal("error starting log handler: %s", err) } - pipelines := pipelinemanager.NewPipelineManager(logcfg) + logDestroy, err := setDestroyLogsOnStop() + if err != nil { + fatal("DESTROY_LOGS_ON_STOP must be 'true' or 'false': %s", err) + } + pipelines := pipelinemanager.NewPipelineManager(logDestroy) sdkHandler := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`) // Create handlers for startup and shutdown of the log driver sdkHandler.HandleFunc("/LogDriver.StartLogging", startLoggingHandler(pipelines)) sdkHandler.HandleFunc("/LogDriver.StopLogging", stopLoggingHandler(pipelines)) + sdkHandler.HandleFunc("/LogDriver.Capabilities", reportCaps()) + sdkHandler.HandleFunc("/LogDriver.ReadLogs", readLogHandler(pipelines)) err = sdkHandler.ServeUnix("beatSocket", 0) if err != nil { diff --git a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go index 62d54c88b6a..1a82dd214e5 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go +++ b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go @@ -5,13 +5,15 @@ package pipelinemanager import ( - "os" + "io" "strings" "time" "github.com/docker/docker/api/types/plugins/logdriver" "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/api/types/backend" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/acker" @@ -20,20 +22,26 @@ import ( "github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipereader" ) -// ClientLogger is an instance of a pipeline logger client meant for reading from a single log stream -// There's a many-to-one relationship between clients and pipelines. -// Each container with the same config will get its own client to the same pipeline. +// ClientLogger collects logs for a docker container logging to stdout and stderr, using the FIFO provided by the docker daemon. +// Each log line is written to a local log file for retrieval via "docker logs", and forwarded to the beats publisher pipeline. +// The local log storage is based on the docker json-file logger and supports the same settings. If "max-size" is not configured, we will rotate the log file every 10MB. type ClientLogger struct { - logFile *pipereader.PipeReader - client beat.Client - pipelineHash uint64 - closer chan struct{} - containerMeta logger.Info - logger *logp.Logger + // pipelineHash is a hash of the libbeat publisher pipeline config + pipelineHash uint64 + // logger is the internal error message logger + logger *logp.Logger + // ContainerMeta is the metadata object for the container we get from docker + ContainerMeta logger.Info + // logFile is the FIFO reader that reads from the docker container stdio + logFile *pipereader.PipeReader + // client is the libbeat client object that sends logs upstream + client beat.Client + // localLog manages the local JSON logs for containers + localLog logger.Logger } // newClientFromPipeline creates a new Client logger with a FIFO reader and beat client -func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info) (*ClientLogger, error) { +func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info, localLog logger.Logger) (*ClientLogger, error) { // setup the beat client settings := beat.ClientConfig{ WaitClose: 0, @@ -50,7 +58,12 @@ func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereade clientLogger.Debugf("Created new logger for %d", hash) - return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hash, closer: make(chan struct{}), containerMeta: info, logger: clientLogger}, nil + return &ClientLogger{logFile: inputFile, + client: client, + pipelineHash: hash, + ContainerMeta: info, + localLog: localLog, + logger: clientLogger}, nil } // Close closes the pipeline client and reader @@ -64,7 +77,6 @@ func (cl *ClientLogger) Close() error { // ConsumePipelineAndSend consumes events from the FIFO pipe and sends them to the pipeline client func (cl *ClientLogger) ConsumePipelineAndSend() { publishWriter := make(chan logdriver.LogEntry, 500) - go cl.publishLoop(publishWriter) // Clean up the reader after we're done defer func() { @@ -76,7 +88,10 @@ func (cl *ClientLogger) ConsumePipelineAndSend() { for { err := cl.logFile.ReadMessage(&log) if err != nil { - cl.logger.Error(os.Stderr, "Error getting message: %s\n", err) + if err == io.EOF { + return + } + cl.logger.Errorf("Error getting message: %s\n", err) return } publishWriter <- log @@ -96,6 +111,7 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) { return } + cl.localLog.Log(constructLogSpoolMsg(entry)) line := strings.TrimSpace(string(entry.Line)) cl.client.Publish(beat.Event{ @@ -103,11 +119,11 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) { Fields: common.MapStr{ "message": line, "container": common.MapStr{ - "labels": helper.DeDotLabels(cl.containerMeta.ContainerLabels, true), - "id": cl.containerMeta.ContainerID, - "name": helper.ExtractContainerName([]string{cl.containerMeta.ContainerName}), + "labels": helper.DeDotLabels(cl.ContainerMeta.ContainerLabels, true), + "id": cl.ContainerMeta.ContainerID, + "name": helper.ExtractContainerName([]string{cl.ContainerMeta.ContainerName}), "image": common.MapStr{ - "name": cl.containerMeta.ContainerImageName, + "name": cl.ContainerMeta.ContainerImageName, }, }, }, @@ -116,3 +132,18 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) { } } + +func constructLogSpoolMsg(line logdriver.LogEntry) *logger.Message { + var msg logger.Message + + msg.Line = line.Line + msg.Source = line.Source + msg.Timestamp = time.Unix(0, line.TimeNano) + if line.PartialLogMetadata != nil { + msg.PLogMetaData = &backend.PartialLogMetaData{} + msg.PLogMetaData.ID = line.PartialLogMetadata.Id + msg.PLogMetaData.Last = line.PartialLogMetadata.Last + msg.PLogMetaData.Ordinal = int(line.PartialLogMetadata.Ordinal) + } + return &msg +} diff --git a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go index 4f396b194be..b53d26e234d 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go +++ b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go @@ -5,13 +5,18 @@ package pipelinemanager import ( + "os" + "path/filepath" "sync" "testing" + "time" "github.com/docker/docker/daemon/logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/docker/docker/daemon/logger/jsonfilelog" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemock" @@ -85,7 +90,17 @@ func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock reader, err := pipereader.NewReaderFromReadCloser(pipelinemock.CreateTestInputFromLine(t, logString)) require.NoError(t, err) - client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject) + info := logger.Info{ + ContainerID: "b87d3b0379f816a5f2f7070f28cc05e2f564a3fb549a67c64ec30fc5b04142ed", + LogPath: filepath.Join("/tmp/dockerbeattest/", string(time.Now().Unix())), + } + + err = os.MkdirAll(filepath.Dir(info.LogPath), 0755) + assert.NoError(t, err) + localLog, err := jsonfilelog.New(info) + assert.NoError(t, err) + + client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject, localLog) require.NoError(t, err) return client diff --git a/x-pack/dockerlogbeat/pipelinemanager/config.go b/x-pack/dockerlogbeat/pipelinemanager/config.go index 3da18bc8546..92d6e98ee9f 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/config.go +++ b/x-pack/dockerlogbeat/pipelinemanager/config.go @@ -35,7 +35,7 @@ func NewCfgFromRaw(input map[string]string) (ContainerOutputConfig, error) { newCfg := ContainerOutputConfig{} endpoint, ok := input["hosts"] if !ok { - return newCfg, errors.New("An endpoint flag is required") + return newCfg, errors.New("A hosts flag is required") } endpointList := strings.Split(endpoint, ",") diff --git a/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go b/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go index e96caa77863..b1d04d16541 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go +++ b/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go @@ -5,7 +5,11 @@ package pipelinemanager import ( + "encoding/binary" "fmt" + "io" + "os" + "path/filepath" "sync" "github.com/mitchellh/hashstructure" @@ -14,7 +18,11 @@ import ( "github.com/pkg/errors" + "github.com/docker/docker/api/types/plugins/logdriver" "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/daemon/logger/jsonfilelog" + + protoio "github.com/gogo/protobuf/io" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" @@ -41,14 +49,23 @@ type PipelineManager struct { pipelines map[uint64]*Pipeline // clients config: filepath clients map[string]*ClientLogger + // Client Logger key: container hash + clientLogger map[string]logger.Logger + // logDirectory is the bindmount for local container logsd + logDirectory string + // destroyLogsOnStop indicates for the client to remove log files when a container stops + destroyLogsOnStop bool } // NewPipelineManager creates a new Pipeline map -func NewPipelineManager(logCfg *common.Config) *PipelineManager { +func NewPipelineManager(logDestroy bool) *PipelineManager { return &PipelineManager{ - Logger: logp.NewLogger("PipelineManager"), - pipelines: make(map[uint64]*Pipeline), - clients: make(map[string]*ClientLogger), + Logger: logp.NewLogger("PipelineManager"), + pipelines: make(map[uint64]*Pipeline), + clients: make(map[string]*ClientLogger), + clientLogger: make(map[string]logger.Logger), + logDirectory: "/var/log/docker/containers", + destroyLogsOnStop: logDestroy, } } @@ -62,13 +79,16 @@ func (pm *PipelineManager) CloseClientWithFile(file string) error { hash := cl.pipelineHash + // remove the logger + pm.removeLogger(cl.ContainerMeta) + pm.Logger.Debugf("Closing Client first from pipelineManager") err = cl.Close() if err != nil { return errors.Wrap(err, "error closing client") } - //if the pipeline is no longer in use, clean up + // if the pipeline is no longer in use, clean up pm.removePipelineIfNeeded(hash) return nil @@ -89,20 +109,90 @@ func (pm *PipelineManager) CreateClientWithConfig(containerConfig ContainerOutpu reader, err := pipereader.NewReaderFromPath(file) if err != nil { - return nil, errors.Wrap(err, "") + return nil, errors.Wrap(err, "error creating reader for docker log stream") + } + + // Why is this empty by default? What should be here? Who knows! + if info.LogPath == "" { + info.LogPath = filepath.Join(pm.logDirectory, info.ContainerID, fmt.Sprintf("%s-json.log", info.ContainerID)) + } + err = os.MkdirAll(filepath.Dir(info.LogPath), 0755) + if err != nil { + return nil, errors.Wrap(err, "error creating directory for local logs") + } + // set a default log size + if _, ok := info.Config["max-size"]; !ok { + info.Config["max-size"] = "10M" + } + // set a default log count + if _, ok := info.Config["max-file"]; !ok { + info.Config["max-file"] = "5" + } + + localLog, err := jsonfilelog.New(info) + if err != nil { + return nil, errors.Wrap(err, "error creating local log") } //actually get to crafting the new client. - cl, err := newClientFromPipeline(pipeline.pipeline, reader, hashstring, info) + cl, err := newClientFromPipeline(pipeline.pipeline, reader, hashstring, info, localLog) if err != nil { return nil, errors.Wrap(err, "error creating client") } pm.registerClient(cl, hashstring, file) - + pm.registerLogger(localLog, info) return cl, nil } +// CreateReaderForContainer responds to docker logs requests to pull local logs from the json logger +func (pm *PipelineManager) CreateReaderForContainer(info logger.Info, config logger.ReadConfig) (io.ReadCloser, error) { + logObject, exists := pm.getLogger(info) + if !exists { + return nil, fmt.Errorf("Could not find logger for %s", info.ContainerID) + } + pipeReader, pipeWriter := io.Pipe() + logReader, ok := logObject.(logger.LogReader) + if !ok { + return nil, fmt.Errorf("logger does not support reading") + } + + go func() { + watcher := logReader.ReadLogs(config) + + enc := protoio.NewUint32DelimitedWriter(pipeWriter, binary.BigEndian) + defer enc.Close() + defer watcher.ConsumerGone() + var rawLog logdriver.LogEntry + for { + select { + case msg, ok := <-watcher.Msg: + if !ok { + pipeWriter.Close() + return + } + rawLog.Line = msg.Line + rawLog.Partial = msg.PLogMetaData != nil + rawLog.TimeNano = msg.Timestamp.UnixNano() + rawLog.Source = msg.Source + + if err := enc.WriteMsg(&rawLog); err != nil { + pipeWriter.CloseWithError(err) + return + } + + case err := <-watcher.Err: + pipeWriter.CloseWithError(err) + return + + } + } + + }() + + return pipeReader, nil +} + //=================== // Private methods @@ -134,6 +224,13 @@ func (pm *PipelineManager) getClient(file string) (*ClientLogger, bool) { return cli, exists } +func (pm *PipelineManager) getLogger(info logger.Info) (logger.Logger, bool) { + pm.mu.Lock() + defer pm.mu.Unlock() + logger, exists := pm.clientLogger[info.ContainerID] + return logger, exists +} + // removePipeline removes a pipeline from the manager if it's refcount is zero. func (pm *PipelineManager) removePipelineIfNeeded(hash uint64) { pm.mu.Lock() @@ -161,6 +258,35 @@ func (pm *PipelineManager) registerClient(cl *ClientLogger, hash uint64, clientF pm.pipelines[hash].refCount++ } +// registerLogger registers a local logger used for reading back logs +func (pm *PipelineManager) registerLogger(log logger.Logger, info logger.Info) { + pm.mu.Lock() + defer pm.mu.Unlock() + pm.clientLogger[info.ContainerID] = log +} + +// removeLogger removes a logging instace +func (pm *PipelineManager) removeLogger(info logger.Info) { + pm.mu.Lock() + defer pm.mu.Unlock() + logger, exists := pm.clientLogger[info.ContainerID] + if !exists { + return + } + logger.Close() + delete(pm.clientLogger, info.ContainerID) + if pm.destroyLogsOnStop { + pm.removeLogFile(info.ContainerID) + } +} + +// removeLogFile removes a log file for a given container. Disabled by default. +func (pm *PipelineManager) removeLogFile(id string) error { + toRemove := filepath.Join(pm.logDirectory, id) + + return os.Remove(toRemove) +} + // removeClient deregisters a client func (pm *PipelineManager) removeClient(file string) (*ClientLogger, error) { pm.mu.Lock() diff --git a/x-pack/dockerlogbeat/pipereader/reader.go b/x-pack/dockerlogbeat/pipereader/reader.go index f622fb03ce6..d1f8eb05c21 100644 --- a/x-pack/dockerlogbeat/pipereader/reader.go +++ b/x-pack/dockerlogbeat/pipereader/reader.go @@ -54,7 +54,7 @@ func (reader *PipeReader) ReadMessage(log *logdriver.LogEntry) error { for { lenFrame, err = reader.getValidLengthFrame() if err != nil { - return errors.Wrap(err, "error getting length frame") + return err } if lenFrame <= reader.maxSize { break diff --git a/x-pack/dockerlogbeat/readme.md b/x-pack/dockerlogbeat/readme.md index b6c97035c53..be06d96daa9 100644 --- a/x-pack/dockerlogbeat/readme.md +++ b/x-pack/dockerlogbeat/readme.md @@ -48,3 +48,13 @@ The location of the logs AND the container base directory in the docker docs is You can use this to find the list of plugins running on the host: `runc --root /containers/services/docker/rootfs/run/docker/plugins/runtime-root/plugins.moby/ list` The logs are in `/var/log/docker`. If you want to make the logs useful, you need to find the ID of the plugin. Back on the darwin host, run `docker plugin inspect $name_of_plugin | grep Id` use the hash ID to grep through the logs: `grep 22bb02c1506677cd48cc1cfccc0847c1b602f48f735e51e4933001804f86e2e docker.*` + + +## Local logs + +This plugin fully supports `docker logs`, and it maintains a local copy of logs that can be read without a connection to Elasticsearch. Unfortunately, due to the limitations in the docker plugin API, we can't "clean up" log files when a container is destroyed. The plugin mounts the `/var/lib/docker` directory on the host to write logs. This mount point can be changed via [Docker](https://docs.docker.com/engine/reference/commandline/plugin_set/#change-the-source-of-a-mount). The plugin can also be configured to do a "hard" cleanup and destroy logs when a container stops. To enable this, set the `DESTROY_LOGS_ON_STOP` environment var inside the plugin: +``` +docker plugin set d805664c550e DESTROY_LOGS_ON_STOP=true +``` + +You can also set `max-file`, `max-size` and `compress` via `--log-opts` \ No newline at end of file