diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/writer.go b/internal/pkg/service/stream/storage/level/local/diskwriter/writer.go index 2930fcddcd..d408656e30 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/writer.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/writer.go @@ -170,6 +170,7 @@ func (w *writer) Close(ctx context.Context) error { w.wg.Wait() if w.writen != w.aligned { + w.logger.Warnf(ctx, `file is not aligned, truncating`) seeked, err := w.file.Seek(w.aligned-w.writen, io.SeekCurrent) if err == nil { err = w.file.Truncate(seeked) diff --git a/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go b/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go index a2015b2027..b6d3769adc 100644 --- a/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go +++ b/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go @@ -78,9 +78,10 @@ type NetworkOutput interface { // pipeline implements Pipeline interface, it wraps common logic for all file types. // For conversion between record values and bytes, the encoder.Encoder is used. type pipeline struct { - logger log.Logger - sliceKey model.SliceKey - events *events.Events[Pipeline] + logger log.Logger + sliceKey model.SliceKey + events *events.Events[Pipeline] + flushLock sync.RWMutex encoder encoder.Encoder chain *writechain.Chain @@ -271,8 +272,10 @@ func (p *pipeline) WriteRecord(record recordctx.Context) (int, error) { } // Format and write table row + p.flushLock.RLock() n, err := p.encoder.WriteRecord(record) p.writeWg.Done() + p.flushLock.RUnlock() if err != nil { return n, err } @@ -280,7 +283,7 @@ func (p *pipeline) WriteRecord(record recordctx.Context) (int, error) { // Get notifier to wait for the next sync notifier := p.syncer.Notifier() - // Increments number of high-level writes in progress + // Increments number of high-level writes in progressd p.acceptedWrites.Add(timestamp, 1) // Wait for sync and return sync error, if any @@ -334,6 +337,9 @@ func (p *pipeline) UncompressedSize() datasize.ByteSize { // Flush all internal buffers to the NetworkOutput. // The method is called periodically by the writesync.Syncer. func (p *pipeline) Flush(ctx context.Context) error { + p.flushLock.Lock() + defer p.flushLock.Unlock() + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() diff --git a/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go b/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go index 1164dc9822..74da734eeb 100644 --- a/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go +++ b/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go @@ -369,13 +369,17 @@ func (o *operator) rotateFile(ctx context.Context, file *fileData) { err = o.storage.File().Rotate(file.FileKey.SinkKey, o.clock.Now()).RequireLock(lock).Do(ctx).Err() // Handle error if err != nil { - rb.InvokeIfErr(ctx, &err) - o.logger.Errorf(ctx, "cannot rotate file: %s", err) + // Update the entity, the ctx may be cancelled + dbCtx, dbCancel := context.WithTimeout(context.WithoutCancel(ctx), dbOperationTimeout) + defer dbCancel() + + rb.InvokeIfErr(dbCtx, &err) + o.logger.Errorf(dbCtx, "cannot rotate file: %s", err) // Increment retry delay - rErr := o.storage.File().IncrementRetryAttempt(file.FileKey, o.clock.Now(), err.Error()).RequireLock(lock).Do(ctx).Err() + rErr := o.storage.File().IncrementRetryAttempt(file.FileKey, o.clock.Now(), err.Error()).RequireLock(lock).Do(dbCtx).Err() if rErr != nil { - o.logger.Errorf(ctx, "cannot increment file rotation retry attempt: %s", rErr) + o.logger.Errorf(dbCtx, "cannot increment file rotation retry attempt: %s", rErr) return } } @@ -441,7 +445,7 @@ func (o *operator) closeFile(ctx context.Context, file *fileData) { // If there is an error, increment retry delay if err != nil { o.logger.Error(dbCtx, err.Error()) - fileEntity, rErr := o.storage.File().IncrementRetryAttempt(file.FileKey, o.clock.Now(), err.Error()).RequireLock(lock).Do(ctx).ResultOrErr() + fileEntity, rErr := o.storage.File().IncrementRetryAttempt(file.FileKey, o.clock.Now(), err.Error()).RequireLock(lock).Do(dbCtx).ResultOrErr() if rErr != nil { o.logger.Errorf(ctx, "cannot increment file close retry: %s", rErr) return diff --git a/internal/pkg/service/stream/storage/node/coordinator/slicerotation/operator.go b/internal/pkg/service/stream/storage/node/coordinator/slicerotation/operator.go index e36e9883d0..8cc0b3ecf1 100644 --- a/internal/pkg/service/stream/storage/node/coordinator/slicerotation/operator.go +++ b/internal/pkg/service/stream/storage/node/coordinator/slicerotation/operator.go @@ -297,15 +297,18 @@ func (o *operator) rotateSlice(ctx context.Context, slice *sliceData) { // Handle error if err != nil { var stateErr sliceRepo.UnexpectedFileSliceStatesError + // Update the entity, the ctx may be cancelled + dbCtx, dbCancel := context.WithTimeout(context.WithoutCancel(ctx), dbOperationTimeout) + defer dbCancel() if errors.As(err, &stateErr) && stateErr.FileState != model.FileWriting { - o.logger.Info(ctx, "skipped slice rotation, file is already closed") + o.logger.Info(dbCtx, "skipped slice rotation, file is already closed") } else { - o.logger.Errorf(ctx, "cannot rotate slice: %s", err) + o.logger.Errorf(dbCtx, "cannot rotate slice: %s", err) // Increment retry delay - rErr := o.storage.Slice().IncrementRetryAttempt(slice.SliceKey, o.clock.Now(), err.Error()).RequireLock(lock).Do(ctx).Err() + rErr := o.storage.Slice().IncrementRetryAttempt(slice.SliceKey, o.clock.Now(), err.Error()).RequireLock(lock).Do(dbCtx).Err() if rErr != nil { - o.logger.Errorf(ctx, "cannot increment file rotation retry attempt: %s", err) + o.logger.Errorf(dbCtx, "cannot increment file rotation retry attempt: %s", err) return } } @@ -364,7 +367,7 @@ func (o *operator) closeSlice(ctx context.Context, slice *sliceData) { // If there is an error, increment retry delay if err != nil { o.logger.Error(dbCtx, err.Error()) - sliceEntity, rErr := o.storage.Slice().IncrementRetryAttempt(slice.SliceKey, o.clock.Now(), err.Error()).Do(ctx).ResultOrErr() + sliceEntity, rErr := o.storage.Slice().IncrementRetryAttempt(slice.SliceKey, o.clock.Now(), err.Error()).Do(dbCtx).ResultOrErr() if rErr != nil { o.logger.Errorf(ctx, "cannot increment slice retry: %s", rErr) return @@ -393,7 +396,7 @@ func (o *operator) waitForSliceClosing(ctx context.Context, slice *sliceData) (s // Make sure the statistics cache is up-to-date if err := o.statisticsCache.WaitForRevision(ctx, slice.ModRevision); err != nil { - return statistics.Aggregated{}, errors.PrefixError(err, "error when waiting for statistics cache revision") + return statistics.Aggregated{}, errors.PrefixErrorf(err, "error when waiting for statistics cache revision, actual: %v, expected: %v", o.statisticsCache.Revision(), slice.ModRevision) } // Get slice statistics diff --git a/test/stream/bridge/keboola/keboola_test.go b/test/stream/bridge/keboola/keboola_test.go index 395e497471..8f9fb01bc3 100644 --- a/test/stream/bridge/keboola/keboola_test.go +++ b/test/stream/bridge/keboola/keboola_test.go @@ -34,6 +34,11 @@ func TestKeboolaBridgeWorkflow(t *testing.T) { // Update configuration to make the cluster testable configFn := func(cfg *config.Config) { + // Disable unrelated workers + cfg.Storage.DiskCleanup.Enabled = false + cfg.Storage.MetadataCleanup.Enabled = false + cfg.API.Task.CleanupEnabled = false + // Use deterministic load balancer cfg.Storage.Level.Local.Writer.Network.PipelineBalancer = network.RoundRobinBalancerType