Skip to content

Commit

Permalink
[receiver/filelog] Fix bug in delete_after_read (#31384)
Browse files Browse the repository at this point in the history
Fixes #31383

This enhances `TestDeleteAfterRead` in a way that replicates the
problem, and fixes the issue by retaining metadata on a reader until the
exported `Close` function is called. Previously, we were removing the
metadata when `delete` called `Close`, but this precluded the
opportunity for the caller of `ReadToEnd` to properly manage file
metadata after deletion.
  • Loading branch information
djaglowski committed Feb 26, 2024
1 parent ea1632e commit 166a7b4
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 12 deletions.
27 changes: 27 additions & 0 deletions .chloggen/pkg-stanza-fileconsumer-darfix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix bug where delete_after_read would cause panic

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31383]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
3 changes: 1 addition & 2 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func (m *Manager) Start(persister operator.Persister) error {

func (m *Manager) closePreviousFiles() {
// m.previousPollFiles -> m.knownFiles[0]

for r, _ := m.previousPollFiles.Pop(); r != nil; r, _ = m.previousPollFiles.Pop() {
m.knownFiles[0].Add(r.Close())
}
Expand Down Expand Up @@ -171,7 +170,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
m.Debug("Consuming files", zap.Strings("paths", paths))
m.makeReaders(paths)

m.preConsume(ctx)
m.readLostFiles(ctx)

// read new readers to end
var wg sync.WaitGroup
Expand Down
7 changes: 6 additions & 1 deletion pkg/stanza/fileconsumer/file_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ import (
// Take care of files which disappeared from the pattern since the last poll cycle
// this can mean either files which were removed, or rotated into a name not matching the pattern
// we do this before reading existing files to ensure we emit older log lines before newer ones
func (m *Manager) preConsume(ctx context.Context) {
func (m *Manager) readLostFiles(ctx context.Context) {
if m.readerFactory.DeleteAtEOF {
// Lost files are not expected when delete_at_eof is enabled
// since we are deleting the files before they can become lost.
return
}
lostReaders := make([]*reader.Reader, 0, m.previousPollFiles.Len())
OUTER:
for _, oldReader := range m.previousPollFiles.Get() {
Expand Down
35 changes: 31 additions & 4 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,16 +985,14 @@ func TestDeleteAfterRead(t *testing.T) {
linesPerFile := 10
totalLines := files * linesPerFile

expectedTokens := make([][]byte, 0, totalLines)
actualTokens := make([][]byte, 0, totalLines)

tempDir := t.TempDir()
temps := make([]*os.File, 0, files)
for i := 0; i < files; i++ {
temps = append(temps, filetest.OpenTemp(t, tempDir))
}

// Write logs to each file
expectedTokens := make([][]byte, 0, totalLines)
actualTokens := make([][]byte, 0, totalLines)
for i, temp := range temps {
for j := 0; j < linesPerFile; j++ {
line := filetest.TokenWithLength(100)
Expand Down Expand Up @@ -1023,6 +1021,35 @@ func TestDeleteAfterRead(t *testing.T) {
_, err := os.Stat(temp.Name())
require.True(t, os.IsNotExist(err))
}

// Make more files to ensure deleted files do not cause problems on next poll
temps = make([]*os.File, 0, files)
for i := 0; i < files; i++ {
temps = append(temps, filetest.OpenTemp(t, tempDir))
}

expectedTokens = make([][]byte, 0, totalLines)
actualTokens = make([][]byte, 0, totalLines)
for i, temp := range temps {
for j := 0; j < linesPerFile; j++ {
line := filetest.TokenWithLength(200)
message := fmt.Sprintf("%s %d %d", line, i, j)
_, err := temp.WriteString(message + "\n")
require.NoError(t, err)
expectedTokens = append(expectedTokens, []byte(message))
}
require.NoError(t, temp.Close())
}

operator.poll(context.Background())
actualTokens = append(actualTokens, sink.NextTokens(t, totalLines)...)

require.ElementsMatch(t, expectedTokens, actualTokens)

for _, temp := range temps {
_, err := os.Stat(temp.Name())
require.True(t, os.IsNotExist(err))
}
}

func TestMaxBatching(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/stanza/fileconsumer/file_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
"context"
)

func (m *Manager) preConsume(ctx context.Context) {
// Noop on windows because we close files immediately after reading.
func (m *Manager) readLostFiles(ctx context.Context) {
}

// On windows, we close files immediately after reading because they cannot be moved while open.
Expand Down
12 changes: 8 additions & 4 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,21 @@ func (r *Reader) ReadToEnd(ctx context.Context) {

// Delete will close and delete the file
func (r *Reader) delete() {
r.Close()
r.close()
if err := os.Remove(r.fileName); err != nil {
r.logger.Errorf("could not delete %s", r.fileName)
}
}

// Close will close the file and return the metadata
func (r *Reader) Close() *Metadata {
r.close()
m := r.Metadata
r.Metadata = nil
return m
}

func (r *Reader) close() {
if r.file != nil {
if err := r.file.Close(); err != nil {
r.logger.Debugw("Problem closing reader", zap.Error(err))
Expand All @@ -134,9 +141,6 @@ func (r *Reader) Close() *Metadata {
r.logger.Errorw("Failed to stop header pipeline", zap.Error(err))
}
}
m := r.Metadata
r.Metadata = nil
return m
}

// Read from the file and update the fingerprint if necessary
Expand Down

0 comments on commit 166a7b4

Please sign in to comment.