Skip to content

Commit

Permalink
Adding a guard for 0 bytes on no flush and also adding gcp
Browse files Browse the repository at this point in the history
  • Loading branch information
i3149 committed Oct 28, 2024
1 parent b45f069 commit 3755063
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pkg/sinks/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (s *FileSink) Init(ctx context.Context, format formats.Format, compression

func (s *FileSink) Send(ctx context.Context, payload *kt.Output) {
// In the un-buffered case, write this out right away.
if payload.NoBuffer {
if payload.NoBuffer && len(payload.Body) > 0 {
go s.writeFileNow(ctx, payload.Body)
return
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/sinks/gcloud/gcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ func (s *GCloudSink) Init(ctx context.Context, format formats.Format, compressio
}

func (s *GCloudSink) Send(ctx context.Context, payload *kt.Output) {
// In the un-buffered case, write this out right away.
if payload.NoBuffer && len(payload.Body) > 0 {
go s.send(ctx, payload.Body)
return
}

s.mux.Lock()
defer s.mux.Unlock()
s.buf.Write(payload.Body)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sinks/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (s *S3Sink) Init(ctx context.Context, format formats.Format, compression kt

func (s *S3Sink) Send(ctx context.Context, payload *kt.Output) {
// In the un-buffered case, write this out right away.
if payload.NoBuffer {
if payload.NoBuffer && len(payload.Body) > 0 {
go s.send(ctx, payload.Body)
return
}
Expand Down

0 comments on commit 3755063

Please sign in to comment.