Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds some flushing instrumentation/logs #3340

Merged
merged 1 commit into from
Feb 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (i *Ingester) flush(mayRemoveStreams bool) {
}

i.flushQueuesDone.Wait()
level.Debug(util_log.Logger).Log("msg", "flush queues have drained")

}

Expand Down
5 changes: 5 additions & 0 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type ingesterMetrics struct {
recoveredEntriesTotal prometheus.Counter
recoveredBytesTotal prometheus.Counter
recoveryBytesInUse prometheus.Gauge
recoveryIsFlushing prometheus.Gauge
}

// setRecoveryBytesInUse bounds the bytes reports to >= 0.
Expand Down Expand Up @@ -107,5 +108,9 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
Name: "loki_ingester_wal_bytes_in_use",
Help: "Total number of bytes in use by the WAL recovery process.",
}),
recoveryIsFlushing: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_wal_replay_flushing",
Help: "Whether the wal replay is in a flushing phase due to backpressure",
}),
}
}
19 changes: 19 additions & 0 deletions pkg/ingester/replay_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package ingester
import (
"sync"

util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/dustin/go-humanize"
"github.com/go-kit/kit/log/level"
"go.uber.org/atomic"
)

Expand All @@ -28,6 +31,7 @@ func (f *replayFlusher) Flush() {

instance.streamsMtx.Unlock()
}

}

type Flusher interface {
Expand Down Expand Up @@ -70,8 +74,23 @@ func (c *replayController) Cur() int {

func (c *replayController) Flush() {
if c.isFlushing.CAS(false, true) {
c.metrics.recoveryIsFlushing.Set(1)
prior := c.currentBytes.Load()
level.Debug(util_log.Logger).Log(
"msg", "replay flusher pre-flush",
"bytes", humanize.Bytes(uint64(prior)),
)

c.flusher.Flush()

after := c.currentBytes.Load()
level.Debug(util_log.Logger).Log(
"msg", "replay flusher post-flush",
"bytes", humanize.Bytes(uint64(after)),
)

c.isFlushing.Store(false)
c.metrics.recoveryIsFlushing.Set(0)

// Broadcast after lock is acquired to prevent race conditions with cpu scheduling
// where the flush code could finish before the goroutine which initiated it gets to call
Expand Down