Skip to content

Commit

Permalink
Improve mark file processing. (#3762)
Browse files Browse the repository at this point in the history
This makes boltdb batch for deleting entries smaller and so goroutines for deletion of chunks
are not stuck.
Also improve metrics to refresh more often.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored May 25, 2021
1 parent e5a3afe commit 89b3283
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
26 changes: 24 additions & 2 deletions pkg/storage/stores/shipper/compactor/retention/marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func (r *markerProcessor) Start(deleteFunc func(ctx context.Context, chunkId []b
level.Error(util_log.Logger).Log("msg", "failed to list marks path", "path", r.folder, "err", err)
continue
}
r.sweeperMetrics.markerFilesCurrent.Set(float64(len(paths)))
if len(paths) == 0 {
level.Info(util_log.Logger).Log("msg", "no marks file found")
}
Expand All @@ -221,6 +220,29 @@ func (r *markerProcessor) Start(deleteFunc func(ctx context.Context, chunkId []b

}
}()
r.wg.Add(1)
go func() {
defer r.wg.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
tick := func() {
select {
case <-r.ctx.Done():
case <-ticker.C:
}
}
for ; true; tick() {
if r.ctx.Err() != nil {
return
}
paths, _, err := r.availablePath()
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to list marks path", "path", r.folder, "err", err)
continue
}
r.sweeperMetrics.markerFilesCurrent.Set(float64(len(paths)))
}
}()
}

func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.Context, chunkId []byte) error) error {
Expand Down Expand Up @@ -259,7 +281,7 @@ func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.C
if err != nil {
return err
}
dbUpdate.MaxBatchDelay = 1 * time.Second // 1 s is way enough for saving changes, worst case this operation is idempotent.
dbUpdate.MaxBatchDelay = 5 * time.Millisecond
defer func() {
close(queue)
wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion production/loki-mixin/dashboards/loki-retention.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ local utils = import 'mixin-utils/utils.libsonnet';
)
.addPanel(
$.panel('Marks Files to Process') +
$.queryPanel(['loki_boltdb_shipper_retention_sweeper_marker_files_current{%s}' % $.namespaceMatcher()], ['count']),
$.queryPanel(['sum(loki_boltdb_shipper_retention_sweeper_marker_files_current{%s})' % $.namespaceMatcher()], ['count']),
)
.addPanel(
$.panel('Delete Rate Per Status') +
Expand Down

0 comments on commit 89b3283

Please sign in to comment.