Skip to content

Commit

Permalink
improve boltdb-shipper logging to help debug index query latency issu…
Browse files Browse the repository at this point in the history
…es (#5651)
  • Loading branch information
sandeepsukhani authored Mar 17, 2022
1 parent 59bd44a commit 6a533e5
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 12 deletions.
10 changes: 5 additions & 5 deletions pkg/storage/stores/shipper/downloads/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ func (t *indexSet) Init() (err error) {

defer func() {
if err != nil {
level.Error(util_log.Logger).Log("msg", fmt.Sprintf("failed to initialize table %s, cleaning it up", t.tableName), "err", err)
level.Error(t.logger).Log("msg", "failed to initialize table, cleaning it up", "err", err)
t.err = err

// cleaning up files due to error to avoid returning invalid results.
for fileName := range t.dbs {
if err := t.cleanupDB(fileName); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to cleanup partially downloaded file", "filename", fileName, "err", err)
level.Error(t.logger).Log("msg", "failed to cleanup partially downloaded file", "filename", fileName, "err", err)
}
}
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (t *indexSet) MultiQueries(ctx context.Context, queries []chunk.IndexQuery,
t.lastUsedAt = time.Now()

logger := util_log.WithContext(ctx, t.logger)
level.Debug(logger).Log("table-name", t.tableName, "query-count", len(queries))
level.Debug(logger).Log("query-count", len(queries), "dbs-count", len(t.dbs))

for name, db := range t.dbs {
err := db.View(func(tx *bbolt.Tx) error {
Expand Down Expand Up @@ -279,7 +279,7 @@ func (t *indexSet) Sync(ctx context.Context) (err error) {

// sync downloads updated and new files from the storage relevant for the table and removes the deleted ones
func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
level.Debug(util_log.Logger).Log("msg", fmt.Sprintf("syncing files for table %s", t.tableName))
level.Debug(t.logger).Log("msg", "syncing index files")

defer func() {
status := statusSuccess
Expand All @@ -294,7 +294,7 @@ func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
return err
}

level.Debug(util_log.Logger).Log("msg", fmt.Sprintf("updates for table %s. toDownload: %s, toDelete: %s", t.tableName, toDownload, toDelete))
level.Debug(t.logger).Log("msg", "index sync updates", "toDownload", fmt.Sprint(toDownload), "toDelete", fmt.Sprint(toDelete))

downloadedFiles, err := t.doConcurrentDownload(ctx, toDownload)
if err != nil {
Expand Down
17 changes: 10 additions & 7 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/tenant"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
)

// timeout for downloading initial files for a table to avoid leaking resources by allowing it to take all the time.
Expand Down Expand Up @@ -158,7 +159,7 @@ func (t *table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca

// query both user and common index
for _, uid := range []string{userID, ""} {
indexSet, err := t.getOrCreateIndexSet(uid, true)
indexSet, err := t.getOrCreateIndexSet(ctx, uid, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -258,7 +259,7 @@ func (t *table) Sync(ctx context.Context) error {
// Caller can use IndexSet.AwaitReady() to wait until the IndexSet gets ready, if required.
// forQuerying must be set to true only getting the index for querying since
// it captures the amount of time it takes to download the index at query time.
func (t *table) getOrCreateIndexSet(id string, forQuerying bool) (IndexSet, error) {
func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying bool) (IndexSet, error) {
t.indexSetsMtx.RLock()
indexSet, ok := t.indexSets[id]
t.indexSetsMtx.RUnlock()
Expand Down Expand Up @@ -293,9 +294,11 @@ func (t *table) getOrCreateIndexSet(id string, forQuerying bool) (IndexSet, erro
if forQuerying {
start := time.Now()
defer func() {
duration := time.Since(start).Seconds()
t.metrics.queryTimeTableDownloadDurationSeconds.WithLabelValues(t.name).Add(duration)
level.Info(loggerWithUserID(t.logger, id)).Log("msg", "downloaded index set at query time", "duration", duration)
duration := time.Since(start)
t.metrics.queryTimeTableDownloadDurationSeconds.WithLabelValues(t.name).Add(duration.Seconds())

logger := spanlogger.FromContextWithFallback(ctx, loggerWithUserID(t.logger, id))
level.Info(logger).Log("msg", "downloaded index set at query time", "duration", duration)
}()
}

Expand All @@ -311,7 +314,7 @@ func (t *table) getOrCreateIndexSet(id string, forQuerying bool) (IndexSet, erro
// EnsureQueryReadiness ensures that we have downloaded the common index as well as user index for the provided userIDs.
// When ensuring query readiness for a table, we will always download common index set because it can include index for one of the provided user ids.
func (t *table) EnsureQueryReadiness(ctx context.Context, userIDs []string) error {
commonIndexSet, err := t.getOrCreateIndexSet("", false)
commonIndexSet, err := t.getOrCreateIndexSet(ctx, "", false)
if err != nil {
return err
}
Expand Down Expand Up @@ -339,7 +342,7 @@ func (t *table) EnsureQueryReadiness(ctx context.Context, userIDs []string) erro
// downloadUserIndexes downloads user specific index files concurrently.
func (t *table) downloadUserIndexes(ctx context.Context, userIDs []string) error {
return concurrency.ForEachJob(ctx, len(userIDs), maxDownloadConcurrency, func(ctx context.Context, idx int) error {
indexSet, err := t.getOrCreateIndexSet(userIDs[idx], false)
indexSet, err := t.getOrCreateIndexSet(ctx, userIDs[idx], false)
if err != nil {
return err
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/stores/shipper/storage/cached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/grafana/loki/pkg/storage/chunk"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
)

const (
Expand Down Expand Up @@ -124,6 +125,13 @@ func (c *cachedObjectClient) buildCache(ctx context.Context) error {
return nil
}

logger := spanlogger.FromContextWithFallback(ctx, util_log.Logger)
level.Info(logger).Log("msg", "building index list cache")
now := time.Now()
defer func() {
level.Info(logger).Log("msg", "index list cache built", "duration", time.Since(now))
}()

objects, _, err := c.ObjectClient.List(ctx, "", "")
if err != nil {
return err
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/stores/shipper/util/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"
"sync"

"github.com/go-kit/log/level"

"github.com/grafana/loki/pkg/storage/chunk"
util_math "github.com/grafana/loki/pkg/util/math"
"github.com/grafana/loki/pkg/util/spanlogger"
)

const maxQueriesPerGoroutine = 100
Expand All @@ -29,9 +32,18 @@ func QueriesByTable(queries []chunk.IndexQuery) map[string][]chunk.IndexQuery {
}

func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error {
if len(queries) == 0 {
return nil
}
errs := make(chan error)

id := NewIndexDeduper(callback)
defer func() {
logger := spanlogger.FromContext(ctx)
level.Debug(logger).Log("msg", "done processing index queries", "table-name", queries[0].TableName,
"query-count", len(queries), "num-entries-sent", id.numEntriesSent)
}()

if len(queries) <= maxQueriesPerGoroutine {
return tableQuerier.MultiQueries(ctx, queries, id.Callback)
}
Expand Down Expand Up @@ -59,6 +71,7 @@ func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries [
type IndexDeduper struct {
callback chunk.QueryPagesCallback
seenRangeValues map[string]map[string]struct{}
numEntriesSent int
mtx sync.RWMutex
}

Expand Down Expand Up @@ -105,6 +118,7 @@ func (i *IndexDeduper) isSeen(hashValue string, rangeValue []byte) bool {

// add the rangeValue
i.seenRangeValues[hashValue][rangeValueStr] = struct{}{}
i.numEntriesSent++
return false
}

Expand Down

0 comments on commit 6a533e5

Please sign in to comment.