diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/writer.go b/internal/pkg/service/stream/storage/level/local/diskwriter/writer.go index ba837987f4..75876cacf6 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/writer.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/writer.go @@ -146,6 +146,11 @@ func (w *writer) Write(ctx context.Context, aligned bool, p []byte) (n int, err } func (w *writer) Sync(ctx context.Context) error { + if w.isClosed() { + return nil + // return errors.New(`writer is already closed, cannot sync`) + } + w.wg.Add(1) defer w.wg.Done() return w.file.Sync() @@ -158,17 +163,19 @@ func (w *writer) Events() *events.Events[Writer] { func (w *writer) Close(ctx context.Context) error { w.logger.Debug(ctx, "closing disk writer") + errs := errors.NewMultiError() // Close only once if w.isClosed() { return errors.New(`writer is already closed`) } - close(w.closed) - errs := errors.NewMultiError() - - // Wait for running writes + // Close and wait for running writes + close(w.closed) w.wg.Wait() + // Wait for sync / perform sync one more time to be sure that all data are on the disk + _ = w.Sync(ctx) + if w.writen != w.aligned { w.logger.Warnf(ctx, `file is not aligned, truncating`) seeked, err := w.file.Seek(w.aligned-w.writen, io.SeekCurrent)