diff --git a/CHANGELOG.md b/CHANGELOG.md index 19a2e326738..05377a7444e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [ENHANCEMENT] Add `tempo_ingester_flush_size_bytes` metric. [#777](https://github.com/grafana/tempo/pull/777) (@bboreham) * [ENHANCEMENT] Microservices jsonnet: resource requests and limits can be set in `$._config`. [#793](https://github.com/grafana/tempo/pull/793) (@kvrhdn) * [ENHANCEMENT] Add `-config.expand-env` cli flag to support environment variables expansion in config file. [#796](https://github.com/grafana/tempo/pull/796) (@Ashmita152) +* [ENHANCEMENT] Add ability to control bloom filter caching based on age and/or compaction level. Add new cli command `list cache-summary`. [#805](https://github.com/grafana/tempo/pull/805) (@annanay25) * [ENHANCEMENT] Emit traces for ingester flush operations. [#812](https://github.com/grafana/tempo/pull/812) (@bboreham) * [ENHANCEMENT] Add retry middleware in query-frontend. [#814](https://github.com/grafana/tempo/pull/814) (@kvrhdn) * [CHANGE] Docker images are now prefixed by their branch name [#828](https://github.com/grafana/tempo/pull/828) (@jvrplmlmn) diff --git a/cmd/tempo-cli/cmd-list-cachesummary.go b/cmd/tempo-cli/cmd-list-cachesummary.go new file mode 100644 index 00000000000..c9495bc243a --- /dev/null +++ b/cmd/tempo-cli/cmd-list-cachesummary.go @@ -0,0 +1,90 @@ +package main + +import ( + "fmt" + "os" + "time" + + "github.com/olekukonko/tablewriter" +) + +type listCacheSummaryCmd struct { + TenantID string `arg:"" help:"tenant-id within the bucket"` + backendOptions +} + +func (l *listCacheSummaryCmd) Run(ctx *globalOptions) error { + r, c, err := loadBackend(&l.backendOptions, ctx) + if err != nil { + return err + } + + windowDuration := time.Hour + + results, err := loadBucket(r, c, l.TenantID, windowDuration, false) + if err != nil { + return err + } + + displayCacheSummary(results) + + return nil +} + +func displayCacheSummary(results []blockStats) { + fmt.Println() + fmt.Println("Bloom filter shards by day and compaction level:") + + columns := []string{"bloom filter age"} + out := make([][]string, 0) + bloomTable := make([][]int, 0) + + for _, r := range results { + row := r.CompactionLevel + // extend rows + for len(bloomTable)-1 < int(row) { + bloomTable = append(bloomTable, make([]int, 0)) + } + column := -1 * (int(time.Until(r.StartTime) / (time.Hour * 24))) + // extend column of given row + for len(bloomTable[row])-1 < column { + bloomTable[row] = append(bloomTable[row], 0) + } + // extend columns (header of bloomTable) + for i := len(columns) - 1; i <= column; i++ { + columns = append(columns, fmt.Sprintf("%d days", i)) + } + + if int(row) < len(bloomTable) && column < len(bloomTable[row]) { + bloomTable[row][column] += int(r.BloomShardCount) + } else { + fmt.Println("something wrong with row / column", row, column) + } + } + + fmt.Println() + columnTotals := make([]int, len(columns)-1) + for i, row := range bloomTable { + line := make([]string, 0) + line = append(line, fmt.Sprintf("compaction level %d", i)) + + for j, column := range row { + line = append(line, fmt.Sprintf("%d", column)) + columnTotals[j] += column + } + out = append(out, line) + } + + columnTotalsRow := make([]string, 0, len(columns)) + columnTotalsRow = append(columnTotalsRow, "total") + for _, total := range columnTotals { + columnTotalsRow = append(columnTotalsRow, fmt.Sprintf("%d", total)) + } + + fmt.Println() + w := tablewriter.NewWriter(os.Stdout) + w.SetHeader(columns) + w.AppendBulk(out) + w.SetFooter(columnTotalsRow) + w.Render() +} diff --git a/cmd/tempo-cli/cmd-list-compactionsummary.go b/cmd/tempo-cli/cmd-list-compactionsummary.go index fc80b9d5102..9733c96e847 100644 --- a/cmd/tempo-cli/cmd-list-compactionsummary.go +++ b/cmd/tempo-cli/cmd-list-compactionsummary.go @@ -54,10 +54,9 @@ func displayCompactionSummary(results []blockStats) { sort.Ints(levels) - columns := []string{"lvl", "blocks", "total", "smallest block", "largest block", "earliest", "latest"} + columns := []string{"lvl", "blocks", "total", "smallest block", "largest block", "earliest", "latest", "bloom shards"} out := make([][]string, 0) - for _, l := range levels { sizeSum := uint64(0) sizeMin := uint64(0) @@ -65,11 +64,14 @@ func displayCompactionSummary(results []blockStats) { countSum := 0 countMin := 0 countMax := 0 + countBloomShards := 0 + var newest time.Time var oldest time.Time for _, r := range resultsByLevel[l] { sizeSum += r.Size countSum += r.TotalObjects + countBloomShards += int(r.BloomShardCount) if r.Size < sizeMin || sizeMin == 0 { sizeMin = r.Size @@ -110,6 +112,8 @@ func displayCompactionSummary(results []blockStats) { s = fmt.Sprint(time.Since(oldest).Round(time.Second), " ago") case "latest": s = fmt.Sprint(time.Since(newest).Round(time.Second), " ago") + case "bloom shards": + s = fmt.Sprint(countBloomShards) } line = append(line, s) } diff --git a/cmd/tempo-cli/main.go b/cmd/tempo-cli/main.go index 774f3da867d..e88fd3b4177 100644 --- a/cmd/tempo-cli/main.go +++ b/cmd/tempo-cli/main.go @@ -36,6 +36,7 @@ var cli struct { Block listBlockCmd `cmd:"" help:"List information about a block"` Blocks listBlocksCmd `cmd:"" help:"List information about all blocks in a bucket"` CompactionSummary listCompactionSummaryCmd `cmd:"" help:"List summary of data by compaction level"` + CacheSummary listCacheSummaryCmd `cmd:"" help:"List summary of bloom sizes per day per compaction level"` Index listIndexCmd `cmd:"" help:"List information about a block index"` } `cmd:""` diff --git a/cmd/tempo-cli/shared.go b/cmd/tempo-cli/shared.go index 4c215afed80..b2ea2d685bd 100644 --- a/cmd/tempo-cli/shared.go +++ b/cmd/tempo-cli/shared.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sort" + "strconv" "time" "github.com/google/uuid" @@ -60,16 +61,16 @@ func loadBucket(r backend.Reader, c backend.Compactor, tenantID string, windowRa fmt.Println("total blocks: ", len(blockIDs)) // Load in parallel - wg := boundedwaitgroup.New(10) + wg := boundedwaitgroup.New(20) resultsCh := make(chan blockStats, len(blockIDs)) - for _, id := range blockIDs { + for blockNum, id := range blockIDs { wg.Add(1) - go func(id2 uuid.UUID) { + go func(id2 uuid.UUID, blockNum2 int) { defer wg.Done() - b, err := loadBlock(r, c, tenantID, id2, windowRange, includeCompacted) + b, err := loadBlock(r, c, tenantID, id2, blockNum2, windowRange, includeCompacted) if err != nil { fmt.Println("Error loading block:", id2, err) return @@ -78,7 +79,7 @@ func loadBucket(r backend.Reader, c backend.Compactor, tenantID string, windowRa if b != nil { resultsCh <- *b } - }(id) + }(id, blockNum) } wg.Wait() @@ -96,8 +97,11 @@ func loadBucket(r backend.Reader, c backend.Compactor, tenantID string, windowRa return results, nil } -func loadBlock(r backend.Reader, c backend.Compactor, tenantID string, id uuid.UUID, windowRange time.Duration, includeCompacted bool) (*blockStats, error) { +func loadBlock(r backend.Reader, c backend.Compactor, tenantID string, id uuid.UUID, blockNum int, windowRange time.Duration, includeCompacted bool) (*blockStats, error) { fmt.Print(".") + if blockNum%100 == 0 { + fmt.Print(strconv.Itoa(blockNum)) + } meta, err := r.BlockMeta(context.Background(), id, tenantID) if err == backend.ErrDoesNotExist && !includeCompacted { diff --git a/docs/tempo/website/.DS_Store b/docs/tempo/website/.DS_Store index fa8f84a45e4..a4ab42b5bff 100644 Binary files a/docs/tempo/website/.DS_Store and b/docs/tempo/website/.DS_Store differ diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index 4927a5b96fd..ea023079250 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -372,6 +372,16 @@ storage: # Example: "cache: memcached" [cache: ] + # Minimum compaction level of block to qualify for bloom filter caching. Default is 0 (disabled), meaning + # that compaction level is not used to determine if the bloom filter should be cached. + # Example: "cache_min_compaction_level: 2" + [cache_min_compaction_level: ] + + # Max block age to qualify for bloom filter caching. Default is 0 (disabled), meaning that block age is not + # used to determine if the bloom filter should be cached. + # Example: "cache_max_block_age: 48h" + [cache_max_block_age: ] + # Cortex Background cache configuration. Requires having a cache configured. background_cache: diff --git a/docs/tempo/website/operations/cache-summary.png b/docs/tempo/website/operations/cache-summary.png new file mode 100644 index 00000000000..68dee0af86a Binary files /dev/null and b/docs/tempo/website/operations/cache-summary.png differ diff --git a/docs/tempo/website/operations/caching.md b/docs/tempo/website/operations/caching.md index 9a7fdb1bd1a..b744ab46cb4 100644 --- a/docs/tempo/website/operations/caching.md +++ b/docs/tempo/website/operations/caching.md @@ -46,3 +46,33 @@ Too many open connections ``` When using the [memcached_exporter](https://github.com/prometheus/memcached_exporter), the number of open connections can be observed at `memcached_current_connections`. + +### Cache Size Control + +Tempo querier accesses bloom filters of all blocks while searching for a trace. This essentially mandates the size +of cache to be at-least the total size of the bloom filters (the working set) . However, in larger deployments, the +working set might be larger than the desired size of cache. When that happens, eviction rates on the cache grow high, +and hit rate drop. Not nice! + +Tempo provides two config parameters in order to filter down on the items stored in cache. + +``` + # Min compaction level of block to qualify for caching bloom filter + # Example: "cache_min_compaction_level: 2" + [cache_min_compaction_level: ] + + # Max block age to qualify for caching bloom filter + # Example: "cache_max_block_age: 48h" + [cache_max_block_age: ] +``` + +Using a combination of these config options, we can narrow down on which bloom filters are cached, thereby reducing our +cache eviction rate, and increasing our cache hit rate. Nice! + +So how do we decide the values of these config parameters? We have added a new command to [tempo-cli](../tempo_cli) that +prints a summary of bloom filter shards per day and per compaction level. The result looks something like this: + +

Cache summary

+ +The above image shows the bloom filter shards over 14 days and 6 compaction levels. This can be used to decide the +above configuration parameters. \ No newline at end of file diff --git a/docs/tempo/website/operations/tempo_cli.md b/docs/tempo/website/operations/tempo_cli.md index 9664b2617a4..a04cf1c8b1c 100644 --- a/docs/tempo/website/operations/tempo_cli.md +++ b/docs/tempo/website/operations/tempo_cli.md @@ -133,6 +133,22 @@ Arguments: tempo-cli list compaction-summary -c ./tempo.yaml single-tenant ``` +## List Cache Summary +Prints information about the number of bloom filter shards per day per compaction level. This command is useful to +estimate and fine-tune cache storage. Read the [caching topic](../caching) for more information. + +```bash +tempo-cli list cache-summary +``` + +Arguments: +- `tenant-id` The tenant ID. Use `single-tenant` for single tenant setups. + +**Example:** +```bash +tempo-cli list cache-summary -c ./tempo.yaml single-tenant +``` + ## List Index Lists basic index info for the given block. diff --git a/tempodb/backend/block_meta.go b/tempodb/backend/block_meta.go index a1f2597b1eb..6c5048c96bf 100644 --- a/tempodb/backend/block_meta.go +++ b/tempodb/backend/block_meta.go @@ -19,7 +19,7 @@ type BlockMeta struct { MinID []byte `json:"minID"` // Minimum object id stored in this block MaxID []byte `json:"maxID"` // Maximum object id stored in this block TenantID string `json:"tenantID"` // ID of tehant to which this block belongs - StartTime time.Time `json:"startTime"` // Currently mostly meaningless but roughly matches to the time the first obj was written to this block + StartTime time.Time `json:"startTime"` // Roughly matches when the first obj was written to this block. Used to determine block age for different purposes (cacheing, etc) EndTime time.Time `json:"endTime"` // Currently mostly meaningless but roughly matches to the time the last obj was written to this block TotalObjects int `json:"totalObjects"` // Total objects in this block Size uint64 `json:"size"` // Total size in bytes of the data object diff --git a/tempodb/compactor.go b/tempodb/compactor.go index 5700ad572e1..e96fcbf8b29 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -260,7 +260,9 @@ func appendBlock(rw *readerWriter, tracker backend.AppendTracker, block *encodin func finishBlock(rw *readerWriter, tracker backend.AppendTracker, block *encoding.StreamingBlock) error { level.Info(rw.logger).Log("msg", "writing compacted block", "block", fmt.Sprintf("%+v", block.BlockMeta())) - bytesFlushed, err := block.Complete(context.TODO(), tracker, rw.w) + w := rw.getWriterForBlock(block.BlockMeta(), time.Now()) + + bytesFlushed, err := block.Complete(context.TODO(), tracker, w) if err != nil { return err } diff --git a/tempodb/config.go b/tempodb/config.go index 7361b9b3b1b..0c077e04a5a 100644 --- a/tempodb/config.go +++ b/tempodb/config.go @@ -21,6 +21,7 @@ const DefaultBlocklistPollConcurrency = uint(50) const DefaultRetentionConcurrency = uint(10) // Config holds the entirety of tempodb configuration +// Defaults are in modules/storage/config.go type Config struct { Pool *pool.Config `yaml:"pool,omitempty"` WAL *wal.Config `yaml:"wal"` @@ -37,10 +38,12 @@ type Config struct { Azure *azure.Config `yaml:"azure"` // caches - Cache string `yaml:"cache"` - BackgroundCache *cortex_cache.BackgroundConfig `yaml:"background_cache"` - Memcached *memcached.Config `yaml:"memcached"` - Redis *redis.Config `yaml:"redis"` + Cache string `yaml:"cache"` + CacheMinCompactionLevel uint8 `yaml:"cache_min_compaction_level"` + CacheMaxBlockAge time.Duration `yaml:"cache_max_block_age"` + BackgroundCache *cortex_cache.BackgroundConfig `yaml:"background_cache"` + Memcached *memcached.Config `yaml:"memcached"` + Redis *redis.Config `yaml:"redis"` } // CompactorConfig contains compaction configuration options diff --git a/tempodb/encoding/block.go b/tempodb/encoding/block.go index 8979bf82558..f81c343a8b8 100644 --- a/tempodb/encoding/block.go +++ b/tempodb/encoding/block.go @@ -65,7 +65,8 @@ func CopyBlock(ctx context.Context, meta *backend.BlockMeta, src backend.Reader, blockID := meta.BlockID tenantID := meta.TenantID - copy := func(name string) error { + // Copy streams, efficient but can't cache. + copyStream := func(name string) error { reader, size, err := src.StreamReader(ctx, name, blockID, tenantID) if err != nil { return errors.Wrapf(err, "error reading %s", name) @@ -75,8 +76,18 @@ func CopyBlock(ctx context.Context, meta *backend.BlockMeta, src backend.Reader, return dest.StreamWriter(ctx, name, blockID, tenantID, reader, size) } + // Read entire object and attempt to cache + copy := func(name string) error { + b, err := src.Read(ctx, name, blockID, tenantID, true) + if err != nil { + return errors.Wrapf(err, "error reading %s", name) + } + + return dest.Write(ctx, name, blockID, tenantID, b, true) + } + // Data - err := copy(nameObjects) + err := copyStream(nameObjects) if err != nil { return err } @@ -90,7 +101,7 @@ func CopyBlock(ctx context.Context, meta *backend.BlockMeta, src backend.Reader, } // Index - err = copy(nameIndex) + err = copyStream(nameIndex) if err != nil { return err } diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 2df31376e3e..2d1b45c3a36 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -92,6 +92,7 @@ type CompactorOverrides interface { } type WriteableBlock interface { + BlockMeta() *backend.BlockMeta Write(ctx context.Context, w backend.Writer) error } @@ -100,6 +101,9 @@ type readerWriter struct { w backend.Writer c backend.Compactor + uncachedReader backend.Reader + uncachedWriter backend.Writer + wal *wal.WAL pool *pool.Pool @@ -145,6 +149,9 @@ func New(cfg *Config, logger log.Logger) (Reader, Writer, Compactor, error) { return nil, nil, nil, err } + uncachedReader := backend.NewReader(rawR) + uncachedWriter := backend.NewWriter(rawW) + var cacheBackend cortex_cache.Cache switch cfg.Cache { @@ -170,6 +177,8 @@ func New(cfg *Config, logger log.Logger) (Reader, Writer, Compactor, error) { rw := &readerWriter{ c: c, r: r, + uncachedReader: uncachedReader, + uncachedWriter: uncachedWriter, w: w, cfg: cfg, logger: logger, @@ -190,7 +199,8 @@ func New(cfg *Config, logger log.Logger) (Reader, Writer, Compactor, error) { } func (rw *readerWriter) WriteBlock(ctx context.Context, c WriteableBlock) error { - return c.Write(ctx, rw.w) + w := rw.getWriterForBlock(c.BlockMeta(), time.Now()) + return c.Write(ctx, w) } // CompleteBlock iterates the given WAL block and flushes it to the TempoDB backend. @@ -309,9 +319,11 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID, return nil, nil, nil } + curTime := time.Now() partialTraces, dataEncodings, err := rw.pool.RunJobs(ctx, copiedBlocklist, func(ctx context.Context, payload interface{}) ([]byte, string, error) { meta := payload.(*backend.BlockMeta) - block, err := encoding.NewBackendBlock(meta, rw.r) + r := rw.getReaderForBlock(meta, curTime) + block, err := encoding.NewBackendBlock(meta, r) if err != nil { return nil, "", err } @@ -466,6 +478,36 @@ func (rw *readerWriter) updateBlocklist(tenantID string, add []*backend.BlockMet rw.compactedBlockLists[tenantID] = append(rw.compactedBlockLists[tenantID], compactedAdd...) } +func (rw *readerWriter) shouldCache(meta *backend.BlockMeta, curTime time.Time) bool { + // compaction level is _atleast_ CacheMinCompactionLevel + if rw.cfg.CacheMinCompactionLevel > 0 && meta.CompactionLevel < rw.cfg.CacheMinCompactionLevel { + return false + } + + // block is not older than CacheMaxBlockAge + if rw.cfg.CacheMaxBlockAge > 0 && curTime.Sub(meta.StartTime) > rw.cfg.CacheMaxBlockAge { + return false + } + + return true +} + +func (rw *readerWriter) getReaderForBlock(meta *backend.BlockMeta, curTime time.Time) backend.Reader { + if rw.shouldCache(meta, curTime) { + return rw.r + } + + return rw.uncachedReader +} + +func (rw *readerWriter) getWriterForBlock(meta *backend.BlockMeta, curTime time.Time) backend.Writer { + if rw.shouldCache(meta, curTime) { + return rw.w + } + + return rw.uncachedWriter +} + // includeBlock indicates whether a given block should be included in a backend search func includeBlock(b *backend.BlockMeta, id common.ID, blockStart []byte, blockEnd []byte) bool { if bytes.Compare(id, b.MinID) == -1 || bytes.Compare(id, b.MaxID) == 1 { diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index b890df92955..ed797bc7bdb 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -841,3 +841,70 @@ func TestCompleteBlock(t *testing.T) { assert.True(t, proto.Equal(out, reqs[i])) } } + +func TestShouldCache(t *testing.T) { + tempDir, err := ioutil.TempDir(tmpdir, "") + defer os.RemoveAll(tempDir) + require.NoError(t, err) + + r, _, _, err := New(&Config{ + Backend: "local", + Local: &local.Config{ + Path: path.Join(tempDir, "traces"), + }, + Block: &encoding.BlockConfig{ + IndexDownsampleBytes: 17, + BloomFP: .01, + BloomShardSizeBytes: 100_000, + Encoding: backend.EncLZ4_256k, + IndexPageSizeBytes: 1000, + }, + WAL: &wal.Config{ + Filepath: path.Join(tempDir, "wal"), + }, + BlocklistPoll: 0, + CacheMaxBlockAge: time.Hour, + CacheMinCompactionLevel: 1, + }, log.NewNopLogger()) + require.NoError(t, err) + + rw := r.(*readerWriter) + + testCases := []struct { + name string + compactionLevel uint8 + startTime time.Time + cache bool + }{ + { + name: "both pass", + compactionLevel: 1, + startTime: time.Now(), + cache: true, + }, + { + name: "startTime fail", + compactionLevel: 2, + startTime: time.Now().Add(-2 * time.Hour), + cache: false, + }, + { + name: "compactionLevel fail", + compactionLevel: 0, + startTime: time.Now(), + cache: false, + }, + { + name: "both fail", + compactionLevel: 0, + startTime: time.Now(), + cache: false, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.cache, rw.shouldCache(&backend.BlockMeta{CompactionLevel: tt.compactionLevel, StartTime: tt.startTime}, time.Now())) + }) + } +}