diff --git a/CHANGELOG.md b/CHANGELOG.md index 0016a80bf8a..56028e9b3bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,14 @@ * [BUGFIX] Fixed issue when query-frontend doesn't log request details when request is cancelled [#1136](https://github.com/grafana/tempo/issues/1136) (@adityapwr) * [BUGFIX] Update OTLP port in examples (docker-compose & kubernetes) from legacy ports (55680/55681) to new ports (4317/4318) [#1294](https://github.com/grafana/tempo/pull/1294) (@mapno) * [BUGFIX] Fixes min/max time on blocks to be based on span times instead of ingestion time. [#1314](https://github.com/grafana/tempo/pull/1314) (@joe-elliott) + * Includes new configuration option to restrict the amount of slack around now to update the block start/end time. [#1332](https://github.com/grafana/tempo/pull/1332) (@joe-elliott) + ``` + storage: + trace: + wal: + ingestion_time_range_slack: 2m0s + ``` + * Includes a new metric to determine how often this range is exceeded: `tempo_warnings_total{reason="outside_ingestion_time_slack"}` ## v1.3.2 / 2022-02-23 * [BUGFIX] Fixed an issue where the query-frontend would corrupt start/end time ranges on searches which included the ingesters [#1295] (@joe-elliott) diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index 1b90e715f2b..23686597bf1 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -651,6 +651,13 @@ storage: # Options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 [search_encoding: | default = none] + # When a span is written to the WAL it adjusts the start and end times of the block it is written to. + # This block start and end time range is then used when choosing blocks for search. To prevent spans too far + # in the past or future from impacting the block start and end times we use this configuration option. + # This option only allows spans that occur within the configured duration to adjust the block start and + # end times. + [ingestion_time_range_slack: | default = 2m] + # block configuration block: @@ -671,7 +678,6 @@ storage: # number of bytes per search page [search_page_size_bytes: | default = 1MiB] - ``` ## Memberlist diff --git a/docs/tempo/website/configuration/manifest.md b/docs/tempo/website/configuration/manifest.md index 2e0ea7d1bd6..2cd4305f51c 100644 --- a/docs/tempo/website/configuration/manifest.md +++ b/docs/tempo/website/configuration/manifest.md @@ -316,6 +316,7 @@ storage: blocksfilepath: /tmp/tempo/wal/blocks encoding: snappy search_encoding: none + ingestion_time_range_slack: 2m0s block: index_downsample_bytes: 1048576 index_page_size_bytes: 256000 diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index de3f69ead68..7018b9db80a 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -327,6 +327,10 @@ func (i *Ingester) TransferOut(ctx context.Context) error { func (i *Ingester) replayWal() error { level.Info(log.Logger).Log("msg", "beginning wal replay") + // pass i.cfg.MaxBlockDuration into RescanBlocks to make an attempt to set the start time + // of the blocks correctly. as we are scanning traces in the blocks we read their start/end times + // and attempt to set start/end times appropriately. we use now - max_block_duration - ingestion_slack + // as the minimum acceptable start time for a replayed block. blocks, err := i.store.WAL().RescanBlocks(func(b []byte, dataEncoding string) (uint32, uint32, error) { d, err := model.NewObjectDecoder(dataEncoding) if err != nil { @@ -342,7 +346,7 @@ func (i *Ingester) replayWal() error { return 0, 0, err } return start, end, nil - }, log.Logger) + }, i.cfg.MaxBlockDuration, log.Logger) if err != nil { return fmt.Errorf("fatal error replaying wal %w", err) } diff --git a/modules/storage/config.go b/modules/storage/config.go index dd5e399509c..612d2acb068 100644 --- a/modules/storage/config.go +++ b/modules/storage/config.go @@ -2,6 +2,7 @@ package storage import ( "flag" + "time" "github.com/grafana/tempo/pkg/cache" @@ -36,6 +37,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) f.StringVar(&cfg.Trace.WAL.Filepath, util.PrefixConfig(prefix, "trace.wal.path"), "/var/tempo/wal", "Path at which store WAL blocks.") cfg.Trace.WAL.Encoding = backend.EncSnappy cfg.Trace.WAL.SearchEncoding = backend.EncNone + cfg.Trace.WAL.IngestionSlack = 2 * time.Minute cfg.Trace.Search = &tempodb.SearchConfig{} cfg.Trace.Search.ChunkSizeBytes = tempodb.DefaultSearchChunkSizeBytes diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 8ffd303ce5a..f338710ca95 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -21,8 +21,9 @@ const maxDataEncodingLength = 32 // AppendBlock is a block that is actively used to append new objects to. It stores all data in the appendFile // in the order it was received and an in memory sorted index. type AppendBlock struct { - meta *backend.BlockMeta - encoding encoding.VersionedEncoding + meta *backend.BlockMeta + encoding encoding.VersionedEncoding + ingestionSlack time.Duration appendFile *os.File appender encoding.Appender @@ -32,7 +33,7 @@ type AppendBlock struct { once sync.Once } -func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding, dataEncoding string) (*AppendBlock, error) { +func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding, dataEncoding string, ingestionSlack time.Duration) (*AppendBlock, error) { if strings.ContainsRune(dataEncoding, ':') || len([]rune(dataEncoding)) > maxDataEncodingLength { return nil, fmt.Errorf("dataEncoding %s is invalid", dataEncoding) @@ -44,9 +45,10 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.En } h := &AppendBlock{ - encoding: v, - meta: backend.NewBlockMeta(tenantID, id, v.Version(), e, dataEncoding), - filepath: filepath, + encoding: v, + meta: backend.NewBlockMeta(tenantID, id, v.Version(), e, dataEncoding), + filepath: filepath, + ingestionSlack: ingestionSlack, } name := h.fullFilename() @@ -69,7 +71,7 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.En // newAppendBlockFromFile returns an AppendBlock that can not be appended to, but can // be completed. It can return a warning or a fatal error -func newAppendBlockFromFile(filename string, path string, fn RangeFunc) (*AppendBlock, error, error) { +func newAppendBlockFromFile(filename string, path string, ingestionSlack time.Duration, additionalStartSlack time.Duration, fn RangeFunc) (*AppendBlock, error, error) { var warning error blockID, tenantID, version, e, dataEncoding, err := ParseFilename(filename) if err != nil { @@ -82,9 +84,10 @@ func newAppendBlockFromFile(filename string, path string, fn RangeFunc) (*Append } b := &AppendBlock{ - meta: backend.NewBlockMeta(tenantID, blockID, version, e, dataEncoding), - filepath: path, - encoding: v, + meta: backend.NewBlockMeta(tenantID, blockID, version, e, dataEncoding), + filepath: path, + encoding: v, + ingestionSlack: ingestionSlack, } // replay file to extract records @@ -101,6 +104,7 @@ func newAppendBlockFromFile(filename string, path string, fn RangeFunc) (*Append if err != nil { return err } + start, end = b.adjustTimeRangeForSlack(start, end, additionalStartSlack) if start < blockStart { blockStart = start } @@ -128,6 +132,7 @@ func (a *AppendBlock) Append(id common.ID, b []byte, start, end uint32) error { if err != nil { return err } + start, end = a.adjustTimeRangeForSlack(start, end, 0) a.meta.ObjectAdded(id, start, end) return nil } @@ -238,3 +243,25 @@ func (a *AppendBlock) file() (*os.File, error) { return a.readFile, err } + +func (a *AppendBlock) adjustTimeRangeForSlack(start uint32, end uint32, additionalStartSlack time.Duration) (uint32, uint32) { + now := time.Now() + startOfRange := uint32(now.Add(-a.ingestionSlack).Add(-additionalStartSlack).Unix()) + endOfRange := uint32(now.Add(a.ingestionSlack).Unix()) + + warn := false + if start < startOfRange { + warn = true + start = uint32(now.Unix()) + } + if end > endOfRange { + warn = true + end = uint32(now.Unix()) + } + + if warn { + metricWarnings.WithLabelValues(a.meta.TenantID, reasonOutsideIngestionSlack).Inc() + } + + return start, end +} diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index fff7049883e..d4dc8ae9060 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -10,12 +10,24 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/backend/local" versioned_encoding "github.com/grafana/tempo/tempodb/encoding" ) +const reasonOutsideIngestionSlack = "outside_ingestion_time_slack" + +var ( + metricWarnings = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Name: "warnings_total", + Help: "The total number of warnings per tenant with reason.", + }, []string{"tenant", "reason"}) +) + // extracts a time range from an object. start/end times returned are unix epoch // seconds type RangeFunc func(obj []byte, dataEncoding string) (uint32, uint32, error) @@ -36,6 +48,7 @@ type Config struct { BlocksFilepath string Encoding backend.Encoding `yaml:"encoding"` SearchEncoding backend.Encoding `yaml:"search_encoding"` + IngestionSlack time.Duration `yaml:"ingestion_time_range_slack"` } func New(c *Config) (*WAL, error) { @@ -84,7 +97,7 @@ func New(c *Config) (*WAL, error) { } // RescanBlocks returns a slice of append blocks from the wal folder -func (w *WAL) RescanBlocks(fn RangeFunc, log log.Logger) ([]*AppendBlock, error) { +func (w *WAL) RescanBlocks(fn RangeFunc, additionalStartSlack time.Duration, log log.Logger) ([]*AppendBlock, error) { files, err := os.ReadDir(w.c.Filepath) if err != nil { return nil, err @@ -103,7 +116,7 @@ func (w *WAL) RescanBlocks(fn RangeFunc, log log.Logger) ([]*AppendBlock, error) } level.Info(log).Log("msg", "beginning replay", "file", f.Name(), "size", fileInfo.Size()) - b, warning, err := newAppendBlockFromFile(f.Name(), w.c.Filepath, fn) + b, warning, err := newAppendBlockFromFile(f.Name(), w.c.Filepath, w.c.IngestionSlack, additionalStartSlack, fn) remove := false if err != nil { @@ -138,7 +151,7 @@ func (w *WAL) RescanBlocks(fn RangeFunc, log log.Logger) ([]*AppendBlock, error) } func (w *WAL) NewBlock(id uuid.UUID, tenantID string, dataEncoding string) (*AppendBlock, error) { - return newAppendBlock(id, tenantID, w.c.Filepath, w.c.Encoding, dataEncoding) + return newAppendBlock(id, tenantID, w.c.Filepath, w.c.Encoding, dataEncoding, w.c.IngestionSlack) } func (w *WAL) NewFile(blockid uuid.UUID, tenantid string, dir string) (*os.File, string, backend.Encoding, error) { diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index 8e919483ae2..ca7ef4eb617 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/golang/protobuf/proto" "github.com/google/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/grafana/tempo/pkg/tempopb" @@ -153,7 +154,7 @@ func TestErrorConditions(t *testing.T) { blocks, err := wal.RescanBlocks(func([]byte, string) (uint32, uint32, error) { return 0, 0, nil - }, log.NewNopLogger()) + }, 0, log.NewNopLogger()) require.NoError(t, err, "unexpected error getting blocks") require.Len(t, blocks, 1) @@ -164,10 +165,11 @@ func TestErrorConditions(t *testing.T) { require.NoFileExists(t, filepath.Join(tempDir, "fe0b83eb-a86b-4b6c-9a74-dc272cd5700e:blerg:v2:gzip")) } -func TestAppendBlockStartEnd(t *testing.T) { +func TestAppendBlockStartEnd(t *testing.T) { // jpe extend wal, err := New(&Config{ - Filepath: t.TempDir(), - Encoding: backend.EncNone, + Filepath: t.TempDir(), + Encoding: backend.EncNone, + IngestionSlack: 2 * time.Minute, }) require.NoError(t, err, "unexpected error creating temp wal") @@ -191,12 +193,12 @@ func TestAppendBlockStartEnd(t *testing.T) { require.Equal(t, blockEnd, uint32(block.meta.EndTime.Unix())) // rescan the block and make sure that start/end times are correct - blockStart = uint32(time.Now().Add(time.Hour).Unix()) - blockEnd = uint32(time.Now().Add(2 * time.Hour).Unix()) + blockStart = uint32(time.Now().Add(-time.Hour).Unix()) + blockEnd = uint32(time.Now().Unix()) blocks, err := wal.RescanBlocks(func([]byte, string) (uint32, uint32, error) { return blockStart, blockEnd, nil - }, log.NewNopLogger()) + }, time.Hour, log.NewNopLogger()) require.NoError(t, err, "unexpected error getting blocks") require.Len(t, blocks, 1) @@ -204,6 +206,45 @@ func TestAppendBlockStartEnd(t *testing.T) { require.Equal(t, blockEnd, uint32(blocks[0].meta.EndTime.Unix())) } +func TestAdjustTimeRangeForSlack(t *testing.T) { + a := &AppendBlock{ + meta: &backend.BlockMeta{ + TenantID: "test", + }, + ingestionSlack: 2 * time.Minute, + } + + // test happy path + start := uint32(time.Now().Unix()) + end := uint32(time.Now().Unix()) + actualStart, actualEnd := a.adjustTimeRangeForSlack(start, end, 0) + assert.Equal(t, start, actualStart) + assert.Equal(t, end, actualEnd) + + // test start out of range + now := uint32(time.Now().Unix()) + start = uint32(time.Now().Add(-time.Hour).Unix()) + end = uint32(time.Now().Unix()) + actualStart, actualEnd = a.adjustTimeRangeForSlack(start, end, 0) + assert.Equal(t, now, actualStart) + assert.Equal(t, end, actualEnd) + + // test end out of range + now = uint32(time.Now().Unix()) + start = uint32(time.Now().Unix()) + end = uint32(time.Now().Add(time.Hour).Unix()) + actualStart, actualEnd = a.adjustTimeRangeForSlack(start, end, 0) + assert.Equal(t, start, actualStart) + assert.Equal(t, now, actualEnd) + + // test additional start slack honored + start = uint32(time.Now().Add(-time.Hour).Unix()) + end = uint32(time.Now().Unix()) + actualStart, actualEnd = a.adjustTimeRangeForSlack(start, end, time.Hour) + assert.Equal(t, start, actualStart) + assert.Equal(t, end, actualEnd) +} + func TestAppendReplayFind(t *testing.T) { for _, e := range backend.SupportedEncoding { t.Run(e.String(), func(t *testing.T) { @@ -256,7 +297,7 @@ func testAppendReplayFind(t *testing.T, e backend.Encoding) { blocks, err := wal.RescanBlocks(func([]byte, string) (uint32, uint32, error) { return 0, 0, nil - }, log.NewNopLogger()) + }, 0, log.NewNopLogger()) require.NoError(t, err, "unexpected error getting blocks") require.Len(t, blocks, 1) @@ -489,7 +530,7 @@ func benchmarkWriteFindReplay(b *testing.B, encoding backend.Encoding) { // replay _, err = wal.RescanBlocks(func([]byte, string) (uint32, uint32, error) { return 0, 0, nil - }, log.NewNopLogger()) + }, 0, log.NewNopLogger()) require.NoError(b, err) } }