-
Notifications
You must be signed in to change notification settings - Fork 239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix loki source file windows #2282
Closed
+341
−31
Closed
Changes from 5 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
27cee0f
restart readers that have been stopped
wildum af2e674
update sync time
wildum 6d8a5dd
add retry_interval parameter
wildum b83dbf5
Merge branch 'main' into fix-loki-source-file-windows
wildum 15c9f20
add windows test
wildum 3d08c7b
update retry interval arg
wildum File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,7 @@ type Arguments struct { | |
FileWatch FileWatch `alloy:"file_watch,block,optional"` | ||
TailFromEnd bool `alloy:"tail_from_end,attr,optional"` | ||
LegacyPositionsFile string `alloy:"legacy_positions_file,attr,optional"` | ||
RetryInterval time.Duration `alloy:"retry_interval,attr,optional"` | ||
} | ||
|
||
type FileWatch struct { | ||
|
@@ -143,6 +144,24 @@ func (c *Component) Run(ctx context.Context) error { | |
c.mut.RUnlock() | ||
}() | ||
|
||
tickerChan := make(chan struct{}) | ||
if c.args.RetryInterval > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesnt handle updating the RetryInterval after running, can we add a test for that? |
||
ticker := time.NewTicker(c.args.RetryInterval) | ||
defer ticker.Stop() | ||
|
||
go func() { | ||
for { | ||
select { | ||
case <-ticker.C: | ||
tickerChan <- struct{}{} | ||
case <-ctx.Done(): | ||
close(tickerChan) | ||
return | ||
} | ||
} | ||
}() | ||
} | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
|
@@ -153,6 +172,28 @@ func (c *Component) Run(ctx context.Context) error { | |
receiver.Chan() <- entry | ||
} | ||
c.mut.RUnlock() | ||
case <-tickerChan: | ||
c.mut.Lock() | ||
// Find readers that are stopped and re-create them if the files that they were tailing are back. | ||
// This helps for log rotation on Windows because the tailer is closed as soon as the file is removed. | ||
// On Unix-like systems, it won't re-create any reader because the reader will stay open till the next Update call. | ||
restartReaders := make(map[positions.Entry]reader) | ||
for key, reader := range c.readers { | ||
if !reader.IsRunning() { | ||
_, err := os.Stat(reader.Path()) | ||
if err != nil { | ||
continue | ||
} | ||
restartReaders[key] = reader | ||
} | ||
} | ||
for key, reader := range restartReaders { | ||
level.Debug(c.opts.Logger).Log("msg", "recreate reader", "path", reader.Path()) | ||
reader.Stop() | ||
delete(c.readers, key) | ||
c.addReader(key, reader.Path(), reader.Labels()) | ||
} | ||
c.mut.Unlock() | ||
} | ||
} | ||
} | ||
|
@@ -200,7 +241,6 @@ func (c *Component) Update(args component.Arguments) error { | |
|
||
for _, target := range newArgs.Targets { | ||
path := target[pathLabel] | ||
|
||
labels := make(model.LabelSet) | ||
for k, v := range target { | ||
if strings.HasPrefix(k, model.ReservedLabelPrefix) { | ||
|
@@ -214,19 +254,7 @@ func (c *Component) Update(args component.Arguments) error { | |
if _, exist := c.readers[readersKey]; exist { | ||
continue | ||
} | ||
|
||
c.reportSize(path, labels.String()) | ||
|
||
handler := loki.AddLabelsMiddleware(labels).Wrap(loki.NewEntryHandler(c.handler.Chan(), func() {})) | ||
reader, err := c.startTailing(path, labels, handler) | ||
if err != nil { | ||
continue | ||
} | ||
|
||
c.readers[readersKey] = readerWithHandler{ | ||
reader: reader, | ||
handler: handler, | ||
} | ||
c.addReader(readersKey, path, labels) | ||
} | ||
|
||
// Remove from the positions file any entries that had a Reader before, but | ||
|
@@ -238,6 +266,21 @@ func (c *Component) Update(args component.Arguments) error { | |
return nil | ||
} | ||
|
||
func (c *Component) addReader(key positions.Entry, path string, labels model.LabelSet) { | ||
c.reportSize(path, labels.String()) | ||
|
||
handler := loki.AddLabelsMiddleware(labels).Wrap(loki.NewEntryHandler(c.handler.Chan(), func() {})) | ||
reader, err := c.startTailing(path, labels, handler) | ||
if err != nil { | ||
return | ||
} | ||
|
||
c.readers[key] = readerWithHandler{ | ||
reader: reader, | ||
handler: handler, | ||
} | ||
} | ||
|
||
// readerWithHandler combines a reader with an entry handler associated with | ||
// it. Closing the reader will also close the handler. | ||
type readerWithHandler struct { | ||
|
@@ -331,7 +374,7 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok | |
handler, | ||
c.posFile, | ||
path, | ||
labels.String(), | ||
labels, | ||
c.args.Encoding, | ||
c.args.DecompressionConfig, | ||
) | ||
|
@@ -352,7 +395,7 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok | |
handler, | ||
c.posFile, | ||
path, | ||
labels.String(), | ||
labels, | ||
c.args.Encoding, | ||
pollOptions, | ||
c.args.TailFromEnd, | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
//go:build windows | ||
|
||
package file | ||
|
||
import ( | ||
"context" | ||
"os" | ||
"testing" | ||
"time" | ||
|
||
"github.com/grafana/alloy/internal/component/common/loki" | ||
"github.com/grafana/alloy/internal/component/discovery" | ||
"github.com/grafana/alloy/internal/runtime/componenttest" | ||
"github.com/grafana/alloy/internal/util" | ||
"github.com/prometheus/common/model" | ||
"github.com/stretchr/testify/require" | ||
"go.uber.org/goleak" | ||
) | ||
|
||
func TestDeleteRecreateFileNoRetry(t *testing.T) { | ||
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) | ||
|
||
filename := "example" | ||
|
||
ctx, cancel := context.WithCancel(componenttest.TestContext(t)) | ||
defer cancel() | ||
|
||
// Create file to log to. | ||
f, err := os.Create(filename) | ||
require.NoError(t, err) | ||
|
||
ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file") | ||
require.NoError(t, err) | ||
|
||
ch1 := loki.NewLogsReceiver() | ||
|
||
go func() { | ||
err := ctrl.Run(ctx, Arguments{ | ||
Targets: []discovery.Target{{ | ||
"__path__": f.Name(), | ||
"foo": "bar", | ||
}}, | ||
ForwardTo: []loki.LogsReceiver{ch1}, | ||
}) | ||
require.NoError(t, err) | ||
}() | ||
|
||
ctrl.WaitRunning(time.Minute) | ||
|
||
_, err = f.Write([]byte("writing some text\n")) | ||
require.NoError(t, err) | ||
|
||
wantLabelSet := model.LabelSet{ | ||
"filename": model.LabelValue(f.Name()), | ||
"foo": "bar", | ||
} | ||
|
||
select { | ||
case logEntry := <-ch1.Chan(): | ||
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) | ||
require.Equal(t, "writing some text", logEntry.Line) | ||
require.Equal(t, wantLabelSet, logEntry.Labels) | ||
case <-time.After(5 * time.Second): | ||
require.FailNow(t, "failed waiting for log line") | ||
} | ||
|
||
require.NoError(t, f.Close()) | ||
require.NoError(t, os.Remove(f.Name())) | ||
|
||
// Create a file with the same name | ||
f, err = os.Create(filename) | ||
require.NoError(t, err) | ||
defer os.Remove(f.Name()) | ||
defer f.Close() | ||
|
||
_, err = f.Write([]byte("writing some new text\n")) | ||
require.NoError(t, err) | ||
|
||
select { | ||
case <-ch1.Chan(): | ||
t.Fatalf("Unexpected log entry received") | ||
case <-time.After(2 * time.Second): | ||
// Test passes if no log entry is received within the timeout | ||
// This indicates that the log source does not retry reading the file | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drafting up a new suggestion.
We can state the specific OSes we support.
This sentence (and the overall description) isn't really clear to me. Why will
loki.source_file
try to reopen deleted files? The description in the table says the arg will tell the component to try to reopen closed files. Which is it? I assume it's closed files since trying to reopen deleted files doesn't really make sense. Why won't the component try to reopen closed files on Windows? It's also not really clear to me. Windows caches the files? And the other OSes do not?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a case where a user is rotating a "app.log" file everytime it reaches 5MB by deleting it and creating a new app.log file.
On Windows you must first close the file before deleting it. Closing the file will stop the tailer in the loki.source.file component. If the new file would have a different name, the local.file_match would send it to the loki.source.file component and it would work fine. But for the local.file_match component nothing changed because it only checks at a regular interval and it will just see that there is still an "app.log" file. Because of this, it won't notify the loki.source.file component to start a new tailer and the file won't be tailed (it only notifies the next component when the set of files changed).
On Unix-like systems, you can delete the file without closing it. Because the file is not closed, the tailer is not stopped, it continuously tries to read the file until the local.file_match sends an update. In the case of the user, it will restart the tailer once the new "app.log" file is created because of the names it thinks that it's the same file.