-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[chore] Do regular flush with combined batches #25171
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -178,7 +178,7 @@ func (r *Transformer) flushLoop() { | |
if timeSinceFirstEntry < r.forceFlushTimeout { | ||
continue | ||
} | ||
if err := r.flushSource(source, true); err != nil { | ||
if err := r.flushSource(source, true, nil); err != nil { | ||
r.Errorf("there was error flushing combined logs %s", err) | ||
} | ||
} | ||
|
@@ -198,7 +198,10 @@ func (r *Transformer) Stop() error { | |
|
||
ctx, cancel := context.WithTimeout(context.Background(), time.Second) | ||
defer cancel() | ||
r.flushUncombined(ctx) | ||
err := r.flushUncombined(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
close(r.chClose) | ||
|
||
|
@@ -241,7 +244,7 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error { | |
// This is the first entry in the next batch | ||
case matches && r.matchIndicatesFirst(): | ||
// Flush the existing batch | ||
err := r.flushSource(s, true) | ||
err := r.flushSource(s, true, nil) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -255,7 +258,7 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error { | |
// When matching on first entry, never batch partial first. Just emit immediately | ||
case !matches && r.matchIndicatesFirst() && r.batchMap[s] == nil: | ||
r.addToBatch(ctx, e, s) | ||
return r.flushSource(s, true) | ||
return r.flushSource(s, true, nil) | ||
} | ||
|
||
// This is neither the first entry of a new log, | ||
|
@@ -305,7 +308,7 @@ func (r *Transformer) addToBatch(_ context.Context, e *entry.Entry, source strin | |
batch.recombined.WriteString(s) | ||
|
||
if (r.maxLogSize > 0 && int64(batch.recombined.Len()) > r.maxLogSize) || len(batch.entries) >= r.maxBatchSize { | ||
if err := r.flushSource(source, false); err != nil { | ||
if err := r.flushSource(source, false, nil); err != nil { | ||
r.Errorf("there was error flushing combined logs %s", err) | ||
} | ||
} | ||
|
@@ -315,19 +318,20 @@ func (r *Transformer) addToBatch(_ context.Context, e *entry.Entry, source strin | |
// flushUncombined flushes all the logs in the batch individually to the | ||
// next output in the pipeline. This is only used when there is an error | ||
// or at shutdown to avoid dropping the logs. | ||
func (r *Transformer) flushUncombined(ctx context.Context) { | ||
func (r *Transformer) flushUncombined(ctx context.Context) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this function doesn't flush uncombined anymore |
||
for source := range r.batchMap { | ||
for _, entry := range r.batchMap[source].entries { | ||
r.Write(ctx, entry) | ||
err := r.flushSource(source, true, ctx) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@dmitryax IIUC this is what you intended to say, correct? |
||
if err != nil { | ||
return err | ||
} | ||
r.removeBatch(source) | ||
} | ||
r.ticker.Reset(r.forceFlushTimeout) | ||
return nil | ||
} | ||
|
||
// flushSource combines the entries currently in the batch into a single entry, | ||
// then forwards them to the next operator in the pipeline | ||
func (r *Transformer) flushSource(source string, deleteSource bool) error { | ||
func (r *Transformer) flushSource(source string, deleteSource bool, ctx context.Context) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This fails lint: |
||
batch := r.batchMap[source] | ||
// Skip flushing a combined log if the batch is empty | ||
if batch == nil { | ||
|
@@ -355,7 +359,7 @@ func (r *Transformer) flushSource(source string, deleteSource bool) error { | |
return err | ||
} | ||
|
||
r.Write(context.Background(), base) | ||
r.Write(ctx, base) | ||
if deleteSource { | ||
r.removeBatch(source) | ||
} else { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why pass nil for a context rather than
context.Background()
?