-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
…19885) This PR adds a new option to the `log` input of Filebeat named `file_identity`. The option lets users configure file identity for state tracking. 1. `native` (default): Filebeat identifies files based on their inode and device id. 2. `path`: Files are considered different if they have different paths. 3. `inode_marker`: A special marker file and the inode is used to tell apart files. It is not supported on Windows. State IDs previously were not saved to the registry file. Now, these are persisted on the disk. I introduced a new interface: `file.StateIdentifier`. The responsibility of `StateIdentifier` is to generate an identifier for a `file.State` based on the configuration. If someone wants to implement their own `StateIdentifier` method, all they need is to create a struct which satisfies this interface. ```golang // StateIdentifier generates an ID for a State. type StateIdentifier interface { // GenerateID generates and returns the ID of the state GenerateID(State) (stateId, identifierType string) } ``` As every state has an ID, Filebeat just compares the IDs of the two states to decide if they belong to the same file or not. The scope of the PR does not include strategies which include fingerprinting the contents of the file. (cherry picked from commit 8ff6894)
- Loading branch information
Showing
26 changed files
with
856 additions
and
186 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package file | ||
|
||
import ( | ||
"fmt" | ||
"strconv" | ||
"strings" | ||
|
||
"github.com/mitchellh/hashstructure" | ||
|
||
"github.com/elastic/beats/v7/libbeat/common" | ||
) | ||
|
||
const ( | ||
nativeName = "native" | ||
pathName = "path" | ||
inodeMarkerName = "inode_marker" | ||
|
||
DefaultIdentifierName = nativeName | ||
identitySep = "::" | ||
) | ||
|
||
var ( | ||
identifierFactories = map[string]IdentifierFactory{ | ||
nativeName: newINodeDeviceIdentifier, | ||
pathName: newPathIdentifier, | ||
inodeMarkerName: newINodeMarkerIdentifier, | ||
} | ||
) | ||
|
||
type IdentifierFactory func(*common.Config) (StateIdentifier, error) | ||
|
||
// StateIdentifier generates an ID for a State. | ||
type StateIdentifier interface { | ||
// GenerateID generates and returns the ID of the state and its type | ||
GenerateID(State) (id, identifierType string) | ||
} | ||
|
||
// NewStateIdentifier creates a new state identifier for a log input. | ||
func NewStateIdentifier(ns *common.ConfigNamespace) (StateIdentifier, error) { | ||
if ns == nil { | ||
return newINodeDeviceIdentifier(nil) | ||
} | ||
|
||
identifierType := ns.Name() | ||
f, ok := identifierFactories[identifierType] | ||
if !ok { | ||
return nil, fmt.Errorf("no such file_identity generator: %s", identifierType) | ||
} | ||
|
||
return f(ns.Config()) | ||
} | ||
|
||
type inodeDeviceIdentifier struct { | ||
name string | ||
} | ||
|
||
func newINodeDeviceIdentifier(_ *common.Config) (StateIdentifier, error) { | ||
return &inodeDeviceIdentifier{ | ||
name: nativeName, | ||
}, nil | ||
} | ||
|
||
func (i *inodeDeviceIdentifier) GenerateID(s State) (id, identifierType string) { | ||
stateID := i.name + identitySep + s.FileStateOS.String() | ||
return genIDWithHash(s.Meta, stateID), i.name | ||
} | ||
|
||
type pathIdentifier struct { | ||
name string | ||
} | ||
|
||
func newPathIdentifier(_ *common.Config) (StateIdentifier, error) { | ||
return &pathIdentifier{ | ||
name: pathName, | ||
}, nil | ||
} | ||
|
||
func (p *pathIdentifier) GenerateID(s State) (id, identifierType string) { | ||
stateID := p.name + identitySep + s.Source | ||
return genIDWithHash(s.Meta, stateID), p.name | ||
} | ||
|
||
func genIDWithHash(meta map[string]string, fileID string) string { | ||
if len(meta) == 0 { | ||
return fileID | ||
} | ||
|
||
hashValue, _ := hashstructure.Hash(meta, nil) | ||
var hashBuf [17]byte | ||
hash := strconv.AppendUint(hashBuf[:0], hashValue, 16) | ||
hash = append(hash, '-') | ||
|
||
var b strings.Builder | ||
b.Grow(len(hash) + len(fileID)) | ||
b.Write(hash) | ||
b.WriteString(fileID) | ||
|
||
return b.String() | ||
} | ||
|
||
// mockIdentifier is used for testing | ||
type MockIdentifier struct{} | ||
|
||
func (m *MockIdentifier) GenerateID(s State) (string, string) { return s.Id, "mock" } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
// +build !windows | ||
|
||
package file | ||
|
||
import ( | ||
"fmt" | ||
"io/ioutil" | ||
"os" | ||
"path/filepath" | ||
"time" | ||
|
||
"github.com/elastic/beats/v7/libbeat/common" | ||
"github.com/elastic/beats/v7/libbeat/logp" | ||
) | ||
|
||
type inodeMarkerIdentifier struct { | ||
log *logp.Logger | ||
name string | ||
markerPath string | ||
|
||
markerFileLastModifitaion time.Time | ||
markerTxt string | ||
} | ||
|
||
func newINodeMarkerIdentifier(cfg *common.Config) (StateIdentifier, error) { | ||
var config struct { | ||
MarkerPath string `config:"path" validate:"required"` | ||
} | ||
err := cfg.Unpack(&config) | ||
if err != nil { | ||
return nil, fmt.Errorf("error while reading configuration of INode + marker file configuration: %v", err) | ||
} | ||
|
||
fi, err := os.Stat(config.MarkerPath) | ||
if err != nil { | ||
return nil, fmt.Errorf("error while opening marker file at %s: %v", config.MarkerPath, err) | ||
} | ||
markerContent, err := ioutil.ReadFile(config.MarkerPath) | ||
if err != nil { | ||
return nil, fmt.Errorf("error while reading marker file at %s: %v", config.MarkerPath, err) | ||
} | ||
return &inodeMarkerIdentifier{ | ||
log: logp.NewLogger("inode_marker_identifier_" + filepath.Base(config.MarkerPath)), | ||
name: inodeMarkerName, | ||
markerPath: config.MarkerPath, | ||
markerFileLastModifitaion: fi.ModTime(), | ||
markerTxt: string(markerContent), | ||
}, nil | ||
} | ||
|
||
func (i *inodeMarkerIdentifier) markerContents() string { | ||
f, err := os.Open(i.markerPath) | ||
if err != nil { | ||
i.log.Errorf("Failed to open marker file %s: %v", i.markerPath, err) | ||
return "" | ||
} | ||
defer f.Close() | ||
|
||
fi, err := f.Stat() | ||
if err != nil { | ||
i.log.Errorf("Failed to fetch file information for %s: %v", i.markerPath, err) | ||
return "" | ||
} | ||
if i.markerFileLastModifitaion.Before(fi.ModTime()) { | ||
contents, err := ioutil.ReadFile(i.markerPath) | ||
if err != nil { | ||
i.log.Errorf("Error while reading contents of marker file: %v", err) | ||
return "" | ||
} | ||
i.markerTxt = string(contents) | ||
} | ||
|
||
return i.markerTxt | ||
} | ||
|
||
func (i *inodeMarkerIdentifier) GenerateID(s State) (id, identifierType string) { | ||
m := i.markerContents() | ||
|
||
stateID := fmt.Sprintf("%s%s%s-%s", i.name, identitySep, s.FileStateOS.InodeString(), m) | ||
return genIDWithHash(s.Meta, stateID), i.name | ||
} |
Oops, something went wrong.