Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TempoDB: Restrict Ingestion time range #1332

Merged
merged 5 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@
* [BUGFIX] Fixed issue when query-frontend doesn't log request details when request is cancelled [#1136](https://github.com/grafana/tempo/issues/1136) (@adityapwr)
* [BUGFIX] Update OTLP port in examples (docker-compose & kubernetes) from legacy ports (55680/55681) to new ports (4317/4318) [#1294](https://github.com/grafana/tempo/pull/1294) (@mapno)
* [BUGFIX] Fixes min/max time on blocks to be based on span times instead of ingestion time. [#1314](https://github.com/grafana/tempo/pull/1314) (@joe-elliott)
* Includes new configuration option to restrict the amount of slack around now to update the block start/end time. [#1332](https://github.com/grafana/tempo/pull/1332) (@joe-elliott)
```
storage:
trace:
wal:
ingestion_time_range_slack: 2m0s
```
* Includes a new metric to determine how often this range is exceeded: `tempo_warnings_total{reason="outside_ingestion_time_slack"}`

## v1.3.2 / 2022-02-23
* [BUGFIX] Fixed an issue where the query-frontend would corrupt start/end time ranges on searches which included the ingesters [#1295] (@joe-elliott)
Expand Down
8 changes: 7 additions & 1 deletion docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,13 @@ storage:
# Options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
[search_encoding: <string> | default = none]

# When a span is written to the WAL it adjusts the start and end times of the block it is written to.
# This block start and end time range is then used when choosing blocks for search. To prevent spans too far
# in the past or future from impacting the block start and end times we use this configuration option.
# This option only allows spans that occur within the configured duration to adjust the block start and
# end times.
[ingestion_time_range_slack: <duration> | default = 2m]

# block configuration
block:

Expand All @@ -671,7 +678,6 @@ storage:

# number of bytes per search page
[search_page_size_bytes: <int> | default = 1MiB]

```

## Memberlist
Expand Down
1 change: 1 addition & 0 deletions docs/tempo/website/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ storage:
blocksfilepath: /tmp/tempo/wal/blocks
encoding: snappy
search_encoding: none
ingestion_time_range_slack: 2m0s
block:
index_downsample_bytes: 1048576
index_page_size_bytes: 256000
Expand Down
6 changes: 5 additions & 1 deletion modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,10 @@ func (i *Ingester) TransferOut(ctx context.Context) error {
func (i *Ingester) replayWal() error {
level.Info(log.Logger).Log("msg", "beginning wal replay")

// pass i.cfg.MaxBlockDuration into RescanBlocks to make an attempt to set the start time
// of the blocks correctly. as we are scanning traces in the blocks we read their start/end times
// and attempt to set start/end times appropriately. we use now - max_block_duration - ingestion_slack
// as the minimum acceptable start time for a replayed block.
blocks, err := i.store.WAL().RescanBlocks(func(b []byte, dataEncoding string) (uint32, uint32, error) {
d, err := model.NewObjectDecoder(dataEncoding)
if err != nil {
Expand All @@ -342,7 +346,7 @@ func (i *Ingester) replayWal() error {
return 0, 0, err
}
return start, end, nil
}, log.Logger)
}, i.cfg.MaxBlockDuration, log.Logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agree this is a hard spot but understand the reasons, and this is reasonable. Maybe a comment ?

if err != nil {
return fmt.Errorf("fatal error replaying wal %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions modules/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"flag"
"time"

"github.com/grafana/tempo/pkg/cache"

Expand Down Expand Up @@ -36,6 +37,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
f.StringVar(&cfg.Trace.WAL.Filepath, util.PrefixConfig(prefix, "trace.wal.path"), "/var/tempo/wal", "Path at which store WAL blocks.")
cfg.Trace.WAL.Encoding = backend.EncSnappy
cfg.Trace.WAL.SearchEncoding = backend.EncNone
cfg.Trace.WAL.IngestionSlack = 2 * time.Minute

cfg.Trace.Search = &tempodb.SearchConfig{}
cfg.Trace.Search.ChunkSizeBytes = tempodb.DefaultSearchChunkSizeBytes
Expand Down
47 changes: 37 additions & 10 deletions tempodb/wal/append_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ const maxDataEncodingLength = 32
// AppendBlock is a block that is actively used to append new objects to. It stores all data in the appendFile
// in the order it was received and an in memory sorted index.
type AppendBlock struct {
meta *backend.BlockMeta
encoding encoding.VersionedEncoding
meta *backend.BlockMeta
encoding encoding.VersionedEncoding
ingestionSlack time.Duration

appendFile *os.File
appender encoding.Appender
Expand All @@ -32,7 +33,7 @@ type AppendBlock struct {
once sync.Once
}

func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding, dataEncoding string) (*AppendBlock, error) {
func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding, dataEncoding string, ingestionSlack time.Duration) (*AppendBlock, error) {
if strings.ContainsRune(dataEncoding, ':') ||
len([]rune(dataEncoding)) > maxDataEncodingLength {
return nil, fmt.Errorf("dataEncoding %s is invalid", dataEncoding)
Expand All @@ -44,9 +45,10 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.En
}

h := &AppendBlock{
encoding: v,
meta: backend.NewBlockMeta(tenantID, id, v.Version(), e, dataEncoding),
filepath: filepath,
encoding: v,
meta: backend.NewBlockMeta(tenantID, id, v.Version(), e, dataEncoding),
filepath: filepath,
ingestionSlack: ingestionSlack,
}

name := h.fullFilename()
Expand All @@ -69,7 +71,7 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.En

// newAppendBlockFromFile returns an AppendBlock that can not be appended to, but can
// be completed. It can return a warning or a fatal error
func newAppendBlockFromFile(filename string, path string, fn RangeFunc) (*AppendBlock, error, error) {
func newAppendBlockFromFile(filename string, path string, ingestionSlack time.Duration, additionalStartSlack time.Duration, fn RangeFunc) (*AppendBlock, error, error) {
var warning error
blockID, tenantID, version, e, dataEncoding, err := ParseFilename(filename)
if err != nil {
Expand All @@ -82,9 +84,10 @@ func newAppendBlockFromFile(filename string, path string, fn RangeFunc) (*Append
}

b := &AppendBlock{
meta: backend.NewBlockMeta(tenantID, blockID, version, e, dataEncoding),
filepath: path,
encoding: v,
meta: backend.NewBlockMeta(tenantID, blockID, version, e, dataEncoding),
filepath: path,
encoding: v,
ingestionSlack: ingestionSlack,
}

// replay file to extract records
Expand All @@ -101,6 +104,7 @@ func newAppendBlockFromFile(filename string, path string, fn RangeFunc) (*Append
if err != nil {
return err
}
start, end = b.adjustTimeRangeForSlack(start, end, additionalStartSlack)
if start < blockStart {
blockStart = start
}
Expand Down Expand Up @@ -128,6 +132,7 @@ func (a *AppendBlock) Append(id common.ID, b []byte, start, end uint32) error {
if err != nil {
return err
}
start, end = a.adjustTimeRangeForSlack(start, end, 0)
a.meta.ObjectAdded(id, start, end)
return nil
}
Expand Down Expand Up @@ -238,3 +243,25 @@ func (a *AppendBlock) file() (*os.File, error) {

return a.readFile, err
}

func (a *AppendBlock) adjustTimeRangeForSlack(start uint32, end uint32, additionalStartSlack time.Duration) (uint32, uint32) {
now := time.Now()
startOfRange := uint32(now.Add(-a.ingestionSlack).Add(-additionalStartSlack).Unix())
endOfRange := uint32(now.Add(a.ingestionSlack).Unix())

warn := false
if start < startOfRange {
warn = true
start = uint32(now.Unix())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these clip to the range instead of now?

Copy link
Member Author

@joe-elliott joe-elliott Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I decided to do use time.Now() is b/c we know we are regularly receiving backdated spans from multiple weeks ago.

If we use start instead of time.Now() then every block's time range will always start ingestion_time_range_slack earlier than it actually does. Also, every replayed block's start time will always be replay time - max_block_duration - ingestion_time_range_slack.

}
if end > endOfRange {
warn = true
end = uint32(now.Unix())
}

if warn {
metricWarnings.WithLabelValues(a.meta.TenantID, reasonOutsideIngestionSlack).Inc()
}

return start, end
}
19 changes: 16 additions & 3 deletions tempodb/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,24 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
versioned_encoding "github.com/grafana/tempo/tempodb/encoding"
)

const reasonOutsideIngestionSlack = "outside_ingestion_time_slack"

var (
metricWarnings = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "warnings_total",
Help: "The total number of warnings per tenant with reason.",
}, []string{"tenant", "reason"})
)

// extracts a time range from an object. start/end times returned are unix epoch
// seconds
type RangeFunc func(obj []byte, dataEncoding string) (uint32, uint32, error)
Expand All @@ -36,6 +48,7 @@ type Config struct {
BlocksFilepath string
Encoding backend.Encoding `yaml:"encoding"`
SearchEncoding backend.Encoding `yaml:"search_encoding"`
IngestionSlack time.Duration `yaml:"ingestion_time_range_slack"`
}

func New(c *Config) (*WAL, error) {
Expand Down Expand Up @@ -84,7 +97,7 @@ func New(c *Config) (*WAL, error) {
}

// RescanBlocks returns a slice of append blocks from the wal folder
func (w *WAL) RescanBlocks(fn RangeFunc, log log.Logger) ([]*AppendBlock, error) {
func (w *WAL) RescanBlocks(fn RangeFunc, additionalStartSlack time.Duration, log log.Logger) ([]*AppendBlock, error) {
files, err := os.ReadDir(w.c.Filepath)
if err != nil {
return nil, err
Expand All @@ -103,7 +116,7 @@ func (w *WAL) RescanBlocks(fn RangeFunc, log log.Logger) ([]*AppendBlock, error)
}

level.Info(log).Log("msg", "beginning replay", "file", f.Name(), "size", fileInfo.Size())
b, warning, err := newAppendBlockFromFile(f.Name(), w.c.Filepath, fn)
b, warning, err := newAppendBlockFromFile(f.Name(), w.c.Filepath, w.c.IngestionSlack, additionalStartSlack, fn)

remove := false
if err != nil {
Expand Down Expand Up @@ -138,7 +151,7 @@ func (w *WAL) RescanBlocks(fn RangeFunc, log log.Logger) ([]*AppendBlock, error)
}

func (w *WAL) NewBlock(id uuid.UUID, tenantID string, dataEncoding string) (*AppendBlock, error) {
return newAppendBlock(id, tenantID, w.c.Filepath, w.c.Encoding, dataEncoding)
return newAppendBlock(id, tenantID, w.c.Filepath, w.c.Encoding, dataEncoding, w.c.IngestionSlack)
}

func (w *WAL) NewFile(blockid uuid.UUID, tenantid string, dir string) (*os.File, string, backend.Encoding, error) {
Expand Down
59 changes: 50 additions & 9 deletions tempodb/wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-kit/log"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/tempo/pkg/tempopb"
Expand Down Expand Up @@ -153,7 +154,7 @@ func TestErrorConditions(t *testing.T) {

blocks, err := wal.RescanBlocks(func([]byte, string) (uint32, uint32, error) {
return 0, 0, nil
}, log.NewNopLogger())
}, 0, log.NewNopLogger())
require.NoError(t, err, "unexpected error getting blocks")
require.Len(t, blocks, 1)

Expand All @@ -164,10 +165,11 @@ func TestErrorConditions(t *testing.T) {
require.NoFileExists(t, filepath.Join(tempDir, "fe0b83eb-a86b-4b6c-9a74-dc272cd5700e:blerg:v2:gzip"))
}

func TestAppendBlockStartEnd(t *testing.T) {
func TestAppendBlockStartEnd(t *testing.T) { // jpe extend
wal, err := New(&Config{
Filepath: t.TempDir(),
Encoding: backend.EncNone,
Filepath: t.TempDir(),
Encoding: backend.EncNone,
IngestionSlack: 2 * time.Minute,
})
require.NoError(t, err, "unexpected error creating temp wal")

Expand All @@ -191,19 +193,58 @@ func TestAppendBlockStartEnd(t *testing.T) {
require.Equal(t, blockEnd, uint32(block.meta.EndTime.Unix()))

// rescan the block and make sure that start/end times are correct
blockStart = uint32(time.Now().Add(time.Hour).Unix())
blockEnd = uint32(time.Now().Add(2 * time.Hour).Unix())
blockStart = uint32(time.Now().Add(-time.Hour).Unix())
blockEnd = uint32(time.Now().Unix())

blocks, err := wal.RescanBlocks(func([]byte, string) (uint32, uint32, error) {
return blockStart, blockEnd, nil
}, log.NewNopLogger())
}, time.Hour, log.NewNopLogger())
require.NoError(t, err, "unexpected error getting blocks")
require.Len(t, blocks, 1)

require.Equal(t, blockStart, uint32(blocks[0].meta.StartTime.Unix()))
require.Equal(t, blockEnd, uint32(blocks[0].meta.EndTime.Unix()))
}

func TestAdjustTimeRangeForSlack(t *testing.T) {
a := &AppendBlock{
meta: &backend.BlockMeta{
TenantID: "test",
},
ingestionSlack: 2 * time.Minute,
}

// test happy path
start := uint32(time.Now().Unix())
end := uint32(time.Now().Unix())
actualStart, actualEnd := a.adjustTimeRangeForSlack(start, end, 0)
assert.Equal(t, start, actualStart)
assert.Equal(t, end, actualEnd)

// test start out of range
now := uint32(time.Now().Unix())
start = uint32(time.Now().Add(-time.Hour).Unix())
end = uint32(time.Now().Unix())
actualStart, actualEnd = a.adjustTimeRangeForSlack(start, end, 0)
assert.Equal(t, now, actualStart)
assert.Equal(t, end, actualEnd)

// test end out of range
now = uint32(time.Now().Unix())
start = uint32(time.Now().Unix())
end = uint32(time.Now().Add(time.Hour).Unix())
actualStart, actualEnd = a.adjustTimeRangeForSlack(start, end, 0)
assert.Equal(t, start, actualStart)
assert.Equal(t, now, actualEnd)

// test additional start slack honored
start = uint32(time.Now().Add(-time.Hour).Unix())
end = uint32(time.Now().Unix())
actualStart, actualEnd = a.adjustTimeRangeForSlack(start, end, time.Hour)
assert.Equal(t, start, actualStart)
assert.Equal(t, end, actualEnd)
}

func TestAppendReplayFind(t *testing.T) {
for _, e := range backend.SupportedEncoding {
t.Run(e.String(), func(t *testing.T) {
Expand Down Expand Up @@ -256,7 +297,7 @@ func testAppendReplayFind(t *testing.T, e backend.Encoding) {

blocks, err := wal.RescanBlocks(func([]byte, string) (uint32, uint32, error) {
return 0, 0, nil
}, log.NewNopLogger())
}, 0, log.NewNopLogger())
require.NoError(t, err, "unexpected error getting blocks")
require.Len(t, blocks, 1)

Expand Down Expand Up @@ -489,7 +530,7 @@ func benchmarkWriteFindReplay(b *testing.B, encoding backend.Encoding) {
// replay
_, err = wal.RescanBlocks(func([]byte, string) (uint32, uint32, error) {
return 0, 0, nil
}, log.NewNopLogger())
}, 0, log.NewNopLogger())
require.NoError(b, err)
}
}