Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new compaction max_block_bytes setting #520

Merged
merged 5 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* [CHANGE] Fixed ingester latency spikes on read [#461](https://github.com/grafana/tempo/pull/461)
* [CHANGE] Ingester cut blocks based on size instead of trace count. Replace ingester `traces_per_block` setting with `max_block_bytes`. This is a **breaking change**. [#474](https://github.com/grafana/tempo/issues/474)
* [CHANGE] Refactor cache section in tempodb. This is a **breaking change** b/c the cache config section has changed. [#485](https://github.com/grafana/tempo/pull/485)
* [CHANGE] New compactor setting for max block size data instead of traces. [#520](https://github.com/grafana/tempo/pull/520)
* [FEATURE] Added block compression. This is a **breaking change** b/c some configuration fields moved. [#504](https://github.com/grafana/tempo/pull/504)
* [ENHANCEMENT] Serve config at the "/config" endpoint. [#446](https://github.com/grafana/tempo/pull/446)
* [ENHANCEMENT] Switch blocklist polling and retention to different concurrency mechanism, add configuration options. [#475](https://github.com/grafana/tempo/issues/475)
Expand Down
13 changes: 7 additions & 6 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,13 @@ Compactors stream blocks from the storage backend, combine them and write them b
```
compactor:
compaction:
block_retention: 336h # duration to keep blocks
compacted_block_retention: 1h # duration to keep blocks that have been compacted elsewhere
compaction_window: 1h # blocks in this time window will be compacted together
chunk_size_bytes: 10485760 # amount of data to buffer from input blocks
flush_size_bytes: 31457280 # flush data to backend when buffer is this large
max_compaction_objects: 1000000 # maximum traces in a compacted block
block_retention: 336h # Optional. Duration to keep blocks. Default is 14 days (336h).
compacted_block_retention: 1h # Optional. Duration to keep blocks that have been compacted elsewhere
compaction_window: 4h # Optional. Blocks in this time window will be compacted together
chunk_size_bytes: 10485760 # Optional. Amount of data to buffer from input blocks. Default is 10 MiB
flush_size_bytes: 31457280 # Optional. Flush data to backend when buffer is this large. Default is 30 MiB
max_compaction_objects: 6000000 # Optional. Maximum number of traces in a compacted block. Default is 6 million. Deprecated.
max_block_bytes: 107374182400 # Optional. Maximum size of a compacted block in bytes. Default is 100 GiB
retention_concurrency: 10 # Optional. Number of tenants to process in parallel during retention. Default is 10.
ring:
kvstore:
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-azure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ ingester:
compactor:
compaction:
compaction_window: 1h # blocks in this time window will be compacted together
max_compaction_objects: 1000000 # maximum size of compacted blocks
max_block_bytes: 100_000_000 # maximum size of compacted blocks
block_retention: 1h
compacted_block_retention: 10m

Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-gcs-fake.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ ingester:
compactor:
compaction:
compaction_window: 1h # blocks in this time window will be compacted together
max_compaction_objects: 1000000 # maximum size of compacted blocks
max_block_bytes: 100_000_000 # maximum size of compacted blocks
block_retention: 1h
compacted_block_retention: 10m
flush_size_bytes: 5242880
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ ingester:
compactor:
compaction:
compaction_window: 1h # blocks in this time window will be compacted together
max_compaction_objects: 1000000 # maximum size of compacted blocks
max_block_bytes: 100_000_000 # maximum size of compacted blocks
block_retention: 1h
compacted_block_retention: 10m

Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-s3-minio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ ingester:
compactor:
compaction:
compaction_window: 1h # blocks in this time window will be compacted together
max_compaction_objects: 1000000 # maximum size of compacted blocks
max_block_bytes: 100_000_000 # maximum size of compacted blocks
block_retention: 1h
compacted_block_retention: 10m
flush_size_bytes: 5242880
Expand Down
1 change: 1 addition & 0 deletions modules/compactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)

f.DurationVar(&cfg.Compactor.BlockRetention, util.PrefixConfig(prefix, "compaction.block-retention"), 14*24*time.Hour, "Duration to keep blocks/traces.")
f.IntVar(&cfg.Compactor.MaxCompactionObjects, util.PrefixConfig(prefix, "compaction.max-objects-per-block"), 6000000, "Maximum number of traces in a compacted block.")
f.Uint64Var(&cfg.Compactor.MaxBlockBytes, util.PrefixConfig(prefix, "compaction.max-block-bytes"), 100*1024*1024*1024 /* 100GB */, "Maximum size of a compacted block.")
f.DurationVar(&cfg.Compactor.MaxCompactionRange, util.PrefixConfig(prefix, "compaction.compaction-window"), 4*time.Hour, "Maximum time window across which to compact blocks.")
cfg.OverrideRingKey = ring.CompactorRingKey
}
207 changes: 94 additions & 113 deletions tempodb/compaction_block_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,168 +58,149 @@ func (sbs *simpleBlockSelector) BlocksToCompact() ([]*backend.BlockMeta, string)
// It needs to be reinitialized with updated blocklist.

type timeWindowBlockSelector struct {
blocklist []*backend.BlockMeta
MinInputBlocks int
MaxInputBlocks int
MaxCompactionRange time.Duration // Size of the time window - say 6 hours
MaxCompactionObjects int // maximum size of compacted objects
MaxBlockBytes uint64 // maximum block size, estimate

entries []timeWindowBlockEntry
}

type timeWindowBlockEntry struct {
meta *backend.BlockMeta
group string // Blocks in the same group will be compacted together. Sort order also determines group priority.
order string // Individual block priority within the group.
hash string // Hash string used for sharding ownership, preserves backwards compatibility
}

var _ (CompactionBlockSelector) = (*timeWindowBlockSelector)(nil)

func newTimeWindowBlockSelector(blocklist []*backend.BlockMeta, maxCompactionRange time.Duration, maxCompactionObjects int, minInputBlocks int, maxInputBlocks int) CompactionBlockSelector {
func newTimeWindowBlockSelector(blocklist []*backend.BlockMeta, maxCompactionRange time.Duration, maxCompactionObjects int, maxBlockBytes uint64, minInputBlocks int, maxInputBlocks int) CompactionBlockSelector {
twbs := &timeWindowBlockSelector{
blocklist: append([]*backend.BlockMeta(nil), blocklist...),
MinInputBlocks: minInputBlocks,
MaxInputBlocks: maxInputBlocks,
MaxCompactionRange: maxCompactionRange,
MaxCompactionObjects: maxCompactionObjects,
MaxBlockBytes: maxBlockBytes,
}

activeWindow := twbs.windowForTime(time.Now().Add(-activeWindowDuration))
now := time.Now()
currWindow := twbs.windowForTime(now)
activeWindow := twbs.windowForTime(now.Add(-activeWindowDuration))

for _, b := range blocklist {
w := twbs.windowForBlock(b)

// exclude blocks that fall in last window from active -> inactive cut-over
// blocks in this window will not be compacted in order to avoid
// ownership conflicts where two compactors process the same block
// at the same time as it transitions from last active window to first inactive window.
var newBlocks []*backend.BlockMeta
for _, b := range twbs.blocklist {
if twbs.windowForBlock(b) != activeWindow {
newBlocks = append(newBlocks, b)
// exclude blocks that fall in last window from active -> inactive cut-over
// blocks in this window will not be compacted in order to avoid
// ownership conflicts where two compactors process the same block
// at the same time as it transitions from last active window to first inactive window.
if w == activeWindow {
continue
}
}
twbs.blocklist = newBlocks

// sort by compaction window, level, and then size
sort.Slice(twbs.blocklist, func(i, j int) bool {
bi := twbs.blocklist[i]
bj := twbs.blocklist[j]
entry := timeWindowBlockEntry{
meta: b,
}

wi := twbs.windowForBlock(bi)
wj := twbs.windowForBlock(bj)
age := currWindow - w
if activeWindow <= w {
// inside active window.
// Group by compaction level and window.
// Choose lowest compaction level and most recent windows first.
entry.group = fmt.Sprintf("A-%v-%016X", b.CompactionLevel, age)

if activeWindow <= wi && activeWindow <= wj {
// inside active window. sort by: compaction lvl -> window -> size
// we should always choose the smallest two blocks whos compaction lvl and windows match
if bi.CompactionLevel != bj.CompactionLevel {
return bi.CompactionLevel < bj.CompactionLevel
}
// Within group choose smallest blocks first.
entry.order = fmt.Sprintf("%016X", entry.meta.TotalObjects)

if wi != wj {
return wi > wj
}
entry.hash = fmt.Sprintf("%v-%v-%v", b.TenantID, b.CompactionLevel, w)
} else {
// outside active window. sort by: window -> compaction lvl -> size
// we should always choose the most recent two blocks that can be compacted
if wi != wj {
return wi > wj
}
// outside active window.
// Group by window only. Choose most recent windows first.
entry.group = fmt.Sprintf("B-%016X", age)

if bi.CompactionLevel != bj.CompactionLevel {
return bi.CompactionLevel < bj.CompactionLevel
}
// Within group chose lowest compaction lvl and smallest blocks first.
entry.order = fmt.Sprintf("%v-%016X", b.CompactionLevel, entry.meta.TotalObjects)

entry.hash = fmt.Sprintf("%v-%v", b.TenantID, w)
}

return bi.TotalObjects < bj.TotalObjects
twbs.entries = append(twbs.entries, entry)
}

// sort by group then order
sort.SliceStable(twbs.entries, func(i, j int) bool {
ei := twbs.entries[i]
ej := twbs.entries[j]

if ei.group == ej.group {
return ei.order < ej.order
}
return ei.group < ej.group
})

return twbs
}

func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*backend.BlockMeta, string) {
for len(twbs.blocklist) > 0 {
// find everything from cursor forward that belongs to this block
cursor := 0
currentWindow := twbs.windowForBlock(twbs.blocklist[cursor])

windowBlocks := make([]*backend.BlockMeta, 0)
for cursor < len(twbs.blocklist) {
currentBlock := twbs.blocklist[cursor]

if currentWindow != twbs.windowForBlock(currentBlock) {
for len(twbs.entries) > 0 {
var chosen []timeWindowBlockEntry

// find everything from cursor forward that belongs to this group
// Gather contiguous blocks while staying within limits
i := 0
for ; i < len(twbs.entries); i++ {
for j := i + 1; j < len(twbs.entries); j++ {
stripe := twbs.entries[i : j+1]
if twbs.entries[i].group == twbs.entries[j].group &&
len(stripe) <= twbs.MaxInputBlocks &&
totalObjects(stripe) <= twbs.MaxCompactionObjects &&
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
totalSize(stripe) <= twbs.MaxBlockBytes {
chosen = stripe
} else {
break
}
}
if len(chosen) > 0 {
// Found a stripe of blocks
break
}
cursor++

windowBlocks = append(windowBlocks, currentBlock)
}

// did we find enough blocks?
if len(windowBlocks) >= twbs.MinInputBlocks {
var compactBlocks []*backend.BlockMeta

// blocks in the currently active window
// dangerous to use time.Now()
activeWindow := twbs.windowForTime(time.Now().Add(-activeWindowDuration))
blockWindow := twbs.windowForBlock(windowBlocks[0])

hashString := fmt.Sprintf("%v", windowBlocks[0].TenantID)
compact := true

// the active window should be compacted by level
if activeWindow <= blockWindow {
// search forward for inputBlocks in a row that have the same compaction level
// Gather as many as possible while staying within limits
for i := 0; i <= len(windowBlocks)-twbs.MinInputBlocks+1; i++ {
for j := i + 1; j <= len(windowBlocks)-1 &&
windowBlocks[i].CompactionLevel == windowBlocks[j].CompactionLevel &&
len(compactBlocks)+1 <= twbs.MaxInputBlocks &&
totalObjects(compactBlocks)+windowBlocks[j].TotalObjects <= twbs.MaxCompactionObjects; j++ {
compactBlocks = windowBlocks[i : j+1]
}
if len(compactBlocks) > 0 {
// Found a stripe of blocks
break
}
}
// Remove entries that were checked so they are not considered again.
twbs.entries = twbs.entries[i+len(chosen):]

compact = false
if len(compactBlocks) >= twbs.MinInputBlocks {
compact = true
hashString = fmt.Sprintf("%v-%v-%v", compactBlocks[0].TenantID, compactBlocks[0].CompactionLevel, currentWindow)
}
} else { // all other windows will be compacted using their two smallest blocks
compactBlocks = windowBlocks[:twbs.MinInputBlocks]
hashString = fmt.Sprintf("%v-%v", compactBlocks[0].TenantID, currentWindow)
}
// did we find enough blocks?
if len(chosen) >= twbs.MinInputBlocks {

if totalObjects(compactBlocks) > twbs.MaxCompactionObjects {
compact = false
compactBlocks := make([]*backend.BlockMeta, 0)
for _, e := range chosen {
compactBlocks = append(compactBlocks, e.meta)
}

if compact {
// remove the blocks we are returning so we don't consider them again
// this is horribly inefficient as it's written
for _, blockToCompact := range compactBlocks {
for i, block := range twbs.blocklist {
if block == blockToCompact {
copy(twbs.blocklist[i:], twbs.blocklist[i+1:])
twbs.blocklist[len(twbs.blocklist)-1] = nil
twbs.blocklist = twbs.blocklist[:len(twbs.blocklist)-1]

break
}
}
}

return compactBlocks, hashString
}
return compactBlocks, chosen[0].hash
}

// otherwise update the blocklist
twbs.blocklist = twbs.blocklist[cursor:]
}
return nil, ""
}

func totalObjects(blocks []*backend.BlockMeta) int {
func totalObjects(entries []timeWindowBlockEntry) int {
totalObjects := 0
for _, b := range blocks {
totalObjects += b.TotalObjects
for _, b := range entries {
totalObjects += b.meta.TotalObjects
}
return totalObjects
}

func totalSize(entries []timeWindowBlockEntry) uint64 {
sz := uint64(0)
for _, b := range entries {
sz += b.meta.Size
}
return sz
}

func (twbs *timeWindowBlockSelector) windowForBlock(meta *backend.BlockMeta) int64 {
return twbs.windowForTime(meta.EndTime)
}
Expand Down
Loading