-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added basic multiline support based on stage-chains idea #1380
Conversation
Stage can now decide whether processing should continue or not by calling or not calling next stage in the chain. This allows implementing more complex stages, eg. multiline stage. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
Don't use new pipelineChain for each stage, reuse existing one. Don't call time.Since if not needed. Don't allocate new resultChain for each benchmark iteration. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
- no flush or timeout yet, only first line regex Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
If line doesn't match firstline, but there is no buffered firstline, just pass it forward. Also added some extra details to buffered entries. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
Added flushing of buffered multilines. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
This issue has been automatically marked as stale because it has not had any activity in the past 30 days. It will be closed in 7 days if no further activity occurs. Thank you for your contributions. |
type MultilineConfig struct { | ||
PipelineName *string `mapstructure:"pipeline_name"` | ||
FirstLineRegexp string `mapstructure:"firstline"` | ||
MaxWait string `mapstructure:"max_wait_time"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have a combination of max_wait_time and max_line. We can have this as a second PR though.
@@ -108,14 +114,22 @@ func (p *Pipeline) Name() string { | |||
|
|||
// Wrap implements EntryMiddleware | |||
func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler { | |||
// start flushing every 100ms | |||
go func() { | |||
for range time.Tick(100 * time.Millisecond) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs a way to be stopped. You could use a context passed via the constructor of the Pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or a close function that will cancel a context, I would still make this context from the original context so the cancellation above this context is propagated here.
Name() string | ||
} | ||
|
||
// FlushableStage is a stage that can be flushed. | ||
type FlushableStage interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate on the comment, flush what ? when ? how ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me @pstibrany.
I think you can go ahead and clean this up.
One thing that I realized is we don't have a way to shutdown your stage, let's say if there a multiline forming and we exit, we will probably lose it.
I think you could may be trigger a flush on close with a special boolean flag like force.
type FlushableStage interface {
Stage
Flush(chain RepeatableStageChain,force bool)
}
This also means you might want to have a Close func on a pipeline, and this should be called when the user of the pipeline is done.
May be api.EntryHandler should have a close ?
Good job !
I was able to use this pull request to successfully collect Java logs, including stacktraces as also discussed in #74. It would be awesome to have this feature in a release soon. Is there an ETA, when this will be merged? Thanks, |
Hi Thomas, it needs some more work before it's ready (like addressing Cyril's comments). Unfortunately I am currently busy with some other stuff, so I don't have time to finish it soon. |
Also we agreed, to get this feature in, so this is just a matter of time. |
Thanks for the quick replies and thanks for working on this feature. |
Yes. grafana/grafana#20865. There may be other/better issues. It's a known problem. |
Maybe not for this iteration of the feature, but there's network gear which outputs multiple log lines at the same timestamp with several distinct groups of logs. The only way to reliable deal with this is to be able to define and end string/glob/regex as well as a starting one. |
Hi, any update about this feature ? we really need it! |
Hi. Is it possible to match several lines with a pattern into a single line? For example if I have the following lines: timestamp 111 xxxxxxxxxxxxxxxxxxxxxxxx Join one line with the 111 lines and another line with the 222 lines. Thanks |
Summary: This is a very simple approach based on grafana#1380 to provide multiline or block log entries in promtail. A `multiline` stage is added to pipelines. This stages matches a start line. Once a start line is matched all following lines are appended to an entry and "dropped". Once a new start line is matched the former block of multilines is send. This approach has two downside because log entires are not sent until a new start line is matched. 1. Lines can linger for a long time. The multiline stage should flush out lines if now new start line is matched in a certain time frame. However, the current pipeline interface cannot actively push entries. So a time based flushing would require a bigger refactoring. 2. If the observed system crashes the last log lines are not sent. Thus important information might be lost.
Summary: This is a very simple approach based on grafana#1380 to provide multiline or block log entries in promtail. A `multiline` stage is added to pipelines. This stages matches a start line. Once a start line is matched all following lines are appended to an entry and not passed on to downstream stages. Once a new start line is matched the former block of multilines is sent. If now new line arrives withing `max_wait_time` the block is flushed to the next stage and a new block is started.
* Collapse multiline logs based on a start line. Summary: This is a very simple approach based on #1380 to provide multiline or block log entries in promtail. A `multiline` stage is added to pipelines. This stages matches a start line. Once a start line is matched all following lines are appended to an entry and not passed on to downstream stages. Once a new start line is matched the former block of multilines is sent. If now new line arrives withing `max_wait_time` the block is flushed to the next stage and a new block is started. * Test multiline stage process. * Format code. * Flush multiline block after `max_lines`. * Capture internal state of the stage. * Process different multiline streams in parallel. * Start documenting multiline stage. * Give an example configuration for `multiline` stage. * Make linter happy. * Pass through entries until first start line. * Update pkg/logentry/stages/multiline.go
Added basic support for multiline stage.
It's configuration looks like this:
If line matches first line, then it is buffered and multiline stage waits for additional line until next 'firstline' or max wait time is elapsed.
This PR builds on PR #1375. PoC, don't merge.