Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option to respect filesystem locks to the filelog receiver. #34801

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/filelog-receiver-fs-lock.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: If acquire_fs_lock is true, attempt to acquire a shared lock before reading a file.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34801]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Unix only. If a lock cannot be acquired then the file will be ignored until the next poll cycle.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions pkg/stanza/docs/operators/file_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The `file_input` operator reads logs from files. It will place the lines read in
| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently (minimum = 2). If the number of files matched in the `include` pattern exceeds half of this number, then files will be processed in batches. |
| `max_batches` | 0 | Only applicable when files must be batched in order to respect `max_concurrent_files`. This value limits the number of batches that will be processed during a single poll interval. A value of 0 indicates no limit. |
| `delete_after_read` | `false` | If `true`, each log file will be read and then immediately deleted. Requires that the `filelog.allowFileDeletion` feature gate is enabled. |
| `acquire_fs_lock` | `false` | Whether to attempt to acquire a filesystem lock before reading a file (Unix only). |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `header` | nil | Specifies options for parsing header metadata. Requires that the `filelog.allowHeaderMetadataParsing` feature gate is enabled. See below for details. |
Expand Down
2 changes: 2 additions & 0 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type Config struct {
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
IncludeFileRecordNumber bool `mapstructure:"include_file_record_number,omitempty"`
Compression string `mapstructure:"compression,omitempty"`
AcquireFSLock bool `mapstructure:"acquire_fs_lock,omitempty"`
}

type HeaderConfig struct {
Expand Down Expand Up @@ -170,6 +171,7 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
DeleteAtEOF: c.DeleteAfterRead,
IncludeFileRecordNumber: c.IncludeFileRecordNumber,
Compression: c.Compression,
AcquireFSLock: c.AcquireFSLock,
}

var t tracker.Tracker
Expand Down
1 change: 1 addition & 0 deletions pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestNewConfig(t *testing.T) {
assert.False(t, cfg.IncludeFileOwnerName)
assert.False(t, cfg.IncludeFileOwnerGroupName)
assert.False(t, cfg.IncludeFileRecordNumber)
assert.False(t, cfg.AcquireFSLock)
}

func TestUnmarshal(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Factory struct {
DeleteAtEOF bool
IncludeFileRecordNumber bool
Compression string
AcquireFSLock bool
}

func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
Expand Down Expand Up @@ -77,6 +78,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
deleteAtEOF: f.DeleteAtEOF,
includeFileRecordNum: f.IncludeFileRecordNumber,
compression: f.Compression,
acquireFSLock: f.AcquireFSLock,
}
r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName))

Expand Down
8 changes: 8 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,18 @@ type Reader struct {
needsUpdateFingerprint bool
includeFileRecordNum bool
compression string
acquireFSLock bool
}

// ReadToEnd will read until the end of the file
func (r *Reader) ReadToEnd(ctx context.Context) {
if r.acquireFSLock {
if !r.tryLockFile() {
return
}
defer r.unlockFile()
}

switch r.compression {
case "gzip":
// We need to create a gzip reader each time ReadToEnd is called because the underlying
Expand Down
13 changes: 13 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/reader_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !unix

package reader // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"

func (r *Reader) tryLockFile() bool {
return true
}

func (r *Reader) unlockFile() {
}
30 changes: 30 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/reader_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build unix

package reader // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"

import (
"errors"

"go.uber.org/zap"
"golang.org/x/sys/unix"
)

func (r *Reader) tryLockFile() bool {
if err := unix.Flock(int(r.file.Fd()), unix.LOCK_SH|unix.LOCK_NB); err != nil {
if !errors.Is(err, unix.EWOULDBLOCK) {
r.set.Logger.Error("Failed to lock", zap.Error(err))
}
return false
}

return true
}

func (r *Reader) unlockFile() {
if err := unix.Flock(int(r.file.Fd()), unix.LOCK_UN); err != nil {
r.set.Logger.Error("Failed to unlock", zap.Error(err))
}
}
1 change: 1 addition & 0 deletions receiver/filelogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Tails and parses logs from files.
| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently. If the number of files matched in the `include` pattern exceeds this number, then files will be processed in batches. |
| `max_batches` | 0 | Only applicable when files must be batched in order to respect `max_concurrent_files`. This value limits the number of batches that will be processed during a single poll interval. A value of 0 indicates no limit. |
| `delete_after_read` | `false` | If `true`, each log file will be read and then immediately deleted. Requires that the `filelog.allowFileDeletion` feature gate is enabled. Must be `false` when `start_at` is set to `end`. |
| `acquire_fs_lock` | `false` | Whether to attempt to acquire a filesystem lock before reading a file (Unix only). |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details. |
Expand Down