From 37550639f7072c95ce6bf0456d8ee2f634945952 Mon Sep 17 00:00:00 2001 From: Ian Pye Date: Sun, 27 Oct 2024 21:41:16 -0700 Subject: [PATCH] Adding a guard for 0 bytes on no flush and also adding gcp --- pkg/sinks/file/file.go | 2 +- pkg/sinks/gcloud/gcloud.go | 6 ++++++ pkg/sinks/s3/s3.go | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/sinks/file/file.go b/pkg/sinks/file/file.go index 0454e3b1..d2c8b6cf 100644 --- a/pkg/sinks/file/file.go +++ b/pkg/sinks/file/file.go @@ -89,7 +89,7 @@ func (s *FileSink) Init(ctx context.Context, format formats.Format, compression func (s *FileSink) Send(ctx context.Context, payload *kt.Output) { // In the un-buffered case, write this out right away. - if payload.NoBuffer { + if payload.NoBuffer && len(payload.Body) > 0 { go s.writeFileNow(ctx, payload.Body) return } diff --git a/pkg/sinks/gcloud/gcloud.go b/pkg/sinks/gcloud/gcloud.go index e4721cef..741b844a 100644 --- a/pkg/sinks/gcloud/gcloud.go +++ b/pkg/sinks/gcloud/gcloud.go @@ -116,6 +116,12 @@ func (s *GCloudSink) Init(ctx context.Context, format formats.Format, compressio } func (s *GCloudSink) Send(ctx context.Context, payload *kt.Output) { + // In the un-buffered case, write this out right away. + if payload.NoBuffer && len(payload.Body) > 0 { + go s.send(ctx, payload.Body) + return + } + s.mux.Lock() defer s.mux.Unlock() s.buf.Write(payload.Body) diff --git a/pkg/sinks/s3/s3.go b/pkg/sinks/s3/s3.go index 15adcbbc..935c35b0 100644 --- a/pkg/sinks/s3/s3.go +++ b/pkg/sinks/s3/s3.go @@ -166,7 +166,7 @@ func (s *S3Sink) Init(ctx context.Context, format formats.Format, compression kt func (s *S3Sink) Send(ctx context.Context, payload *kt.Output) { // In the un-buffered case, write this out right away. - if payload.NoBuffer { + if payload.NoBuffer && len(payload.Body) > 0 { go s.send(ctx, payload.Body) return }