Skip to content

Commit

Permalink
Merge pull request #2059 from keboola/fix-write-after-flush
Browse files Browse the repository at this point in the history
fix: There should be flush lock to prevent incorrect write into gzip file
  • Loading branch information
Matovidlo authored Sep 27, 2024
2 parents 1a66ff1 + e495015 commit 33879e3
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (w *writer) Close(ctx context.Context) error {
w.wg.Wait()

if w.writen != w.aligned {
w.logger.Warnf(ctx, `file is not aligned, truncating`)
seeked, err := w.file.Seek(w.aligned-w.writen, io.SeekCurrent)
if err == nil {
err = w.file.Truncate(seeked)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ type NetworkOutput interface {
// pipeline implements Pipeline interface, it wraps common logic for all file types.
// For conversion between record values and bytes, the encoder.Encoder is used.
type pipeline struct {
logger log.Logger
sliceKey model.SliceKey
events *events.Events[Pipeline]
logger log.Logger
sliceKey model.SliceKey
events *events.Events[Pipeline]
flushLock sync.RWMutex

encoder encoder.Encoder
chain *writechain.Chain
Expand Down Expand Up @@ -271,16 +272,18 @@ func (p *pipeline) WriteRecord(record recordctx.Context) (int, error) {
}

// Format and write table row
p.flushLock.RLock()
n, err := p.encoder.WriteRecord(record)
p.writeWg.Done()
p.flushLock.RUnlock()
if err != nil {
return n, err
}

// Get notifier to wait for the next sync
notifier := p.syncer.Notifier()

// Increments number of high-level writes in progress
// Increments number of high-level writes in progressd
p.acceptedWrites.Add(timestamp, 1)

// Wait for sync and return sync error, if any
Expand Down Expand Up @@ -334,6 +337,9 @@ func (p *pipeline) UncompressedSize() datasize.ByteSize {
// Flush all internal buffers to the NetworkOutput.
// The method is called periodically by the writesync.Syncer.
func (p *pipeline) Flush(ctx context.Context) error {
p.flushLock.Lock()
defer p.flushLock.Unlock()

ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,17 @@ func (o *operator) rotateFile(ctx context.Context, file *fileData) {
err = o.storage.File().Rotate(file.FileKey.SinkKey, o.clock.Now()).RequireLock(lock).Do(ctx).Err()
// Handle error
if err != nil {
rb.InvokeIfErr(ctx, &err)
o.logger.Errorf(ctx, "cannot rotate file: %s", err)
// Update the entity, the ctx may be cancelled
dbCtx, dbCancel := context.WithTimeout(context.WithoutCancel(ctx), dbOperationTimeout)
defer dbCancel()

rb.InvokeIfErr(dbCtx, &err)
o.logger.Errorf(dbCtx, "cannot rotate file: %s", err)

// Increment retry delay
rErr := o.storage.File().IncrementRetryAttempt(file.FileKey, o.clock.Now(), err.Error()).RequireLock(lock).Do(ctx).Err()
rErr := o.storage.File().IncrementRetryAttempt(file.FileKey, o.clock.Now(), err.Error()).RequireLock(lock).Do(dbCtx).Err()
if rErr != nil {
o.logger.Errorf(ctx, "cannot increment file rotation retry attempt: %s", rErr)
o.logger.Errorf(dbCtx, "cannot increment file rotation retry attempt: %s", rErr)
return
}
}
Expand Down Expand Up @@ -441,7 +445,7 @@ func (o *operator) closeFile(ctx context.Context, file *fileData) {
// If there is an error, increment retry delay
if err != nil {
o.logger.Error(dbCtx, err.Error())
fileEntity, rErr := o.storage.File().IncrementRetryAttempt(file.FileKey, o.clock.Now(), err.Error()).RequireLock(lock).Do(ctx).ResultOrErr()
fileEntity, rErr := o.storage.File().IncrementRetryAttempt(file.FileKey, o.clock.Now(), err.Error()).RequireLock(lock).Do(dbCtx).ResultOrErr()
if rErr != nil {
o.logger.Errorf(ctx, "cannot increment file close retry: %s", rErr)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,18 @@ func (o *operator) rotateSlice(ctx context.Context, slice *sliceData) {
// Handle error
if err != nil {
var stateErr sliceRepo.UnexpectedFileSliceStatesError
// Update the entity, the ctx may be cancelled
dbCtx, dbCancel := context.WithTimeout(context.WithoutCancel(ctx), dbOperationTimeout)
defer dbCancel()
if errors.As(err, &stateErr) && stateErr.FileState != model.FileWriting {
o.logger.Info(ctx, "skipped slice rotation, file is already closed")
o.logger.Info(dbCtx, "skipped slice rotation, file is already closed")
} else {
o.logger.Errorf(ctx, "cannot rotate slice: %s", err)
o.logger.Errorf(dbCtx, "cannot rotate slice: %s", err)

// Increment retry delay
rErr := o.storage.Slice().IncrementRetryAttempt(slice.SliceKey, o.clock.Now(), err.Error()).RequireLock(lock).Do(ctx).Err()
rErr := o.storage.Slice().IncrementRetryAttempt(slice.SliceKey, o.clock.Now(), err.Error()).RequireLock(lock).Do(dbCtx).Err()
if rErr != nil {
o.logger.Errorf(ctx, "cannot increment file rotation retry attempt: %s", err)
o.logger.Errorf(dbCtx, "cannot increment file rotation retry attempt: %s", err)
return
}
}
Expand Down Expand Up @@ -364,7 +367,7 @@ func (o *operator) closeSlice(ctx context.Context, slice *sliceData) {
// If there is an error, increment retry delay
if err != nil {
o.logger.Error(dbCtx, err.Error())
sliceEntity, rErr := o.storage.Slice().IncrementRetryAttempt(slice.SliceKey, o.clock.Now(), err.Error()).Do(ctx).ResultOrErr()
sliceEntity, rErr := o.storage.Slice().IncrementRetryAttempt(slice.SliceKey, o.clock.Now(), err.Error()).Do(dbCtx).ResultOrErr()
if rErr != nil {
o.logger.Errorf(ctx, "cannot increment slice retry: %s", rErr)
return
Expand Down Expand Up @@ -393,7 +396,7 @@ func (o *operator) waitForSliceClosing(ctx context.Context, slice *sliceData) (s

// Make sure the statistics cache is up-to-date
if err := o.statisticsCache.WaitForRevision(ctx, slice.ModRevision); err != nil {
return statistics.Aggregated{}, errors.PrefixError(err, "error when waiting for statistics cache revision")
return statistics.Aggregated{}, errors.PrefixErrorf(err, "error when waiting for statistics cache revision, actual: %v, expected: %v", o.statisticsCache.Revision(), slice.ModRevision)
}

// Get slice statistics
Expand Down
5 changes: 5 additions & 0 deletions test/stream/bridge/keboola/keboola_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ func TestKeboolaBridgeWorkflow(t *testing.T) {

// Update configuration to make the cluster testable
configFn := func(cfg *config.Config) {
// Disable unrelated workers
cfg.Storage.DiskCleanup.Enabled = false
cfg.Storage.MetadataCleanup.Enabled = false
cfg.API.Task.CleanupEnabled = false

// Use deterministic load balancer
cfg.Storage.Level.Local.Writer.Network.PipelineBalancer = network.RoundRobinBalancerType

Expand Down

0 comments on commit 33879e3

Please sign in to comment.