Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compaction planning fixes #6952

Merged
merged 4 commits into from
Jul 14, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -91,6 +91,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#6968](https://github.com/influxdata/influxdb/issues/6968): Always use the demo config when outputting a new config.
- [#6986](https://github.com/influxdata/influxdb/pull/6986): update connection settings when changing hosts in cli.
- [#6965](https://github.com/influxdata/influxdb/pull/6965): Minor improvements to init script. Removes sysvinit-utils as package dependency.
- [#6952](https://github.com/influxdata/influxdb/pull/6952): Fix compaction planning with large TSM files

## v0.13.0 [2016-05-12]

168 changes: 130 additions & 38 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
@@ -50,6 +50,7 @@ type CompactionGroup []string
type CompactionPlanner interface {
Plan(lastWrite time.Time) []CompactionGroup
PlanLevel(level int) []CompactionGroup
PlanOptimize() []CompactionGroup
}

// DefaultPlanner implements CompactionPlanner using a strategy to roll up
@@ -101,11 +102,9 @@ func (t *tsmGeneration) level() int {
// 1 file with a sequence num of 1. Level 2 is generated by compacting multiple
// level 1 files. Level 3 is generate by compacting multiple level 2 files. Level
// 4 is for anything else.
if len(t.files) == 1 {
_, seq, _ := ParseTSMFileName(t.files[0].Path)
if seq < 4 {
return seq
}
_, seq, _ := ParseTSMFileName(t.files[0].Path)
if seq < 4 {
return seq
}

return 4
@@ -143,62 +142,141 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
// split across several files in sequence.
generations := c.findGenerations()

// If there is only one generation and no tombstones, then there's nothing to
// do.
if len(generations) <= 1 && !generations.hasTombstones() {
return nil
}

// Loop through the generations and find the generations matching the requested
// level
var cGroup CompactionGroup
for i := 0; i < len(generations)-1; i++ {
// Group each generation by level such that two adjacent generations in the same
// level become part of the same group.
var currentGen tsmGenerations
var groups []tsmGenerations
for i := 0; i < len(generations); i++ {
cur := generations[i]
next := generations[i+1]

// If the current and next level match the specified level, then add the current level
// to the group
if level == cur.level() && (next.level() == level || cur.hasTombstones()) {
for _, f := range cur.files {
cGroup = append(cGroup, f.Path)
}
if len(currentGen) == 0 || currentGen[0].level() == cur.level() {
currentGen = append(currentGen, cur)
continue
}
groups = append(groups, currentGen)

currentGen = tsmGenerations{}
currentGen = append(currentGen, cur)
}

// Add the last segments if it matches the level
if len(generations) > 0 {
last := generations[len(generations)-1]
if last.level() == level {
for _, f := range last.files {
cGroup = append(cGroup, f.Path)
if len(currentGen) > 0 {
groups = append(groups, currentGen)
}

// Remove any groups in the wrong level
var levelGroups []tsmGenerations
for _, cur := range groups {
if cur[0].level() == level {
levelGroups = append(levelGroups, cur)
}
}

// Determine the minimum number of files required for the level. Higher levels are more
// CPU intensive so we only want to include them when we have enough data to make them
// worthwhile.
// minGenerations 1 -> 2
// minGenerations 2 -> 2
// minGenerations 3 -> 4
// minGenerations 4 -> 4
minGenerations := level
if minGenerations%2 != 0 {
minGenerations = level + 1
}

var cGroups []CompactionGroup
for _, group := range levelGroups {
for _, chunk := range group.chunk(4) {
var cGroup CompactionGroup
var hasTombstones bool
for _, gen := range chunk {
if gen.hasTombstones() {
hasTombstones = true
}
for _, file := range gen.files {
cGroup = append(cGroup, file.Path)
}
}

if len(chunk) < minGenerations && !hasTombstones {
continue
}

cGroups = append(cGroups, cGroup)
}
}

if len(cGroup) == 0 {
return cGroups
}

// PlanOptimize returns all TSM files if they are in different generations in order
// to optimize the index across TSM files. Each returned compaction group can be
// compacted concurrently.
func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
// Determine the generations from all files on disk. We need to treat
// a generation conceptually as a single file even though it may be
// split across several files in sequence.
generations := c.findGenerations()

// If there is only one generation and no tombstones, then there's nothing to
// do.
if len(generations) <= 1 && !generations.hasTombstones() {
return nil
}

if generations.hasTombstones() {
return []CompactionGroup{cGroup}
// Group each generation by level such that two adjacent generations in the same
// level become part of the same group.
var currentGen tsmGenerations
var groups []tsmGenerations
for i := 0; i < len(generations); i++ {
cur := generations[i]

if len(currentGen) == 0 || currentGen[0].level() == cur.level() {
currentGen = append(currentGen, cur)
continue
}
groups = append(groups, currentGen)

currentGen = tsmGenerations{}
currentGen = append(currentGen, cur)
}

// Ensure we have at least 2 generations. For higher levels, we want to use more files to maximize
// the compression, but we don't want it unbounded since that can cause backups of compactions at that
// level.
// Level 1 -> 2
// Level 2 -> 2
// Level 3 -> 4
// Level 4 -> 4
limit := 2
if level%2 != 0 {
limit = level + 1
if len(currentGen) > 0 {
groups = append(groups, currentGen)
}

if len(cGroup) < limit {
return nil
// Only optimize level 4 files since using lower-levels will collide
// with the level planners
var levelGroups []tsmGenerations
for _, cur := range groups {
if cur[0].level() == 4 {
levelGroups = append(levelGroups, cur)
}
}

var cGroups []CompactionGroup
for _, group := range levelGroups {
// Skip the group if it's not worthwhile to optimize it
if len(group) < 4 && !group.hasTombstones() {
continue
}

var cGroup CompactionGroup
for _, gen := range group {
for _, file := range gen.files {
cGroup = append(cGroup, file.Path)
}
}

cGroups = append(cGroups, cGroup)
}
return []CompactionGroup{cGroup}

return cGroups
}

// Plan returns a set of TSM files to rewrite for level 4 or higher. The planning returns
@@ -1151,3 +1229,17 @@ func (a tsmGenerations) hasTombstones() bool {
}
return false
}

func (a tsmGenerations) chunk(size int) []tsmGenerations {
var chunks []tsmGenerations
for len(a) > 0 {
if len(a) >= size {
chunks = append(chunks, a[:size])
a = a[size:]
} else {
chunks = append(chunks, a)
a = a[len(a):]
}
}
return chunks
}
566 changes: 566 additions & 0 deletions tsdb/engine/tsm1/compact_test.go

Large diffs are not rendered by default.

115 changes: 115 additions & 0 deletions tsdb/engine/tsm1/encoding.gen.go
Original file line number Diff line number Diff line change
@@ -30,6 +30,18 @@ func (a Values) Size() int {
return sz
}

func (a Values) ordered() bool {
if len(a) <= 1 {
return true
}
for i := 1; i < len(a); i++ {
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
return false
}
}
return true
}

func (a Values) assertOrdered() {
if len(a) <= 1 {
return
@@ -98,6 +110,17 @@ func (a Values) Merge(b Values) Values {
return a
}

// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
// possible stored blocks might contain duplicate values. Remove them if they exists before
// merging.
if !a.ordered() {
a = a.Deduplicate()
}

if !b.ordered() {
b = b.Deduplicate()
}

if a[len(a)-1].UnixNano() < b[0].UnixNano() {
return append(a, b...)
}
@@ -178,6 +201,18 @@ func (a FloatValues) Size() int {
return sz
}

func (a FloatValues) ordered() bool {
if len(a) <= 1 {
return true
}
for i := 1; i < len(a); i++ {
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
return false
}
}
return true
}

func (a FloatValues) assertOrdered() {
if len(a) <= 1 {
return
@@ -246,6 +281,17 @@ func (a FloatValues) Merge(b FloatValues) FloatValues {
return a
}

// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
// possible stored blocks might contain duplicate values. Remove them if they exists before
// merging.
if !a.ordered() {
a = a.Deduplicate()
}

if !b.ordered() {
b = b.Deduplicate()
}

if a[len(a)-1].UnixNano() < b[0].UnixNano() {
return append(a, b...)
}
@@ -326,6 +372,18 @@ func (a IntegerValues) Size() int {
return sz
}

func (a IntegerValues) ordered() bool {
if len(a) <= 1 {
return true
}
for i := 1; i < len(a); i++ {
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
return false
}
}
return true
}

func (a IntegerValues) assertOrdered() {
if len(a) <= 1 {
return
@@ -394,6 +452,17 @@ func (a IntegerValues) Merge(b IntegerValues) IntegerValues {
return a
}

// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
// possible stored blocks might contain duplicate values. Remove them if they exists before
// merging.
if !a.ordered() {
a = a.Deduplicate()
}

if !b.ordered() {
b = b.Deduplicate()
}

if a[len(a)-1].UnixNano() < b[0].UnixNano() {
return append(a, b...)
}
@@ -474,6 +543,18 @@ func (a StringValues) Size() int {
return sz
}

func (a StringValues) ordered() bool {
if len(a) <= 1 {
return true
}
for i := 1; i < len(a); i++ {
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
return false
}
}
return true
}

func (a StringValues) assertOrdered() {
if len(a) <= 1 {
return
@@ -542,6 +623,17 @@ func (a StringValues) Merge(b StringValues) StringValues {
return a
}

// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
// possible stored blocks might contain duplicate values. Remove them if they exists before
// merging.
if !a.ordered() {
a = a.Deduplicate()
}

if !b.ordered() {
b = b.Deduplicate()
}

if a[len(a)-1].UnixNano() < b[0].UnixNano() {
return append(a, b...)
}
@@ -622,6 +714,18 @@ func (a BooleanValues) Size() int {
return sz
}

func (a BooleanValues) ordered() bool {
if len(a) <= 1 {
return true
}
for i := 1; i < len(a); i++ {
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
return false
}
}
return true
}

func (a BooleanValues) assertOrdered() {
if len(a) <= 1 {
return
@@ -690,6 +794,17 @@ func (a BooleanValues) Merge(b BooleanValues) BooleanValues {
return a
}

// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
// possible stored blocks might contain duplicate values. Remove them if they exists before
// merging.
if !a.ordered() {
a = a.Deduplicate()
}

if !b.ordered() {
b = b.Deduplicate()
}

if a[len(a)-1].UnixNano() < b[0].UnixNano() {
return append(a, b...)
}
23 changes: 23 additions & 0 deletions tsdb/engine/tsm1/encoding.gen.go.tmpl
Original file line number Diff line number Diff line change
@@ -26,6 +26,18 @@ func (a {{.Name}}Values) Size() int {
return sz
}

func (a {{.Name}}Values) ordered() bool {
if len(a) <= 1 {
return true
}
for i := 1; i < len(a); i++ {
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
return false
}
}
return true
}

func (a {{.Name}}Values) assertOrdered() {
if len(a) <= 1 {
return
@@ -95,6 +107,17 @@ func (a {{.Name}}Values) Merge(b {{.Name}}Values) {{.Name}}Values {
return a
}

// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
// possible stored blocks might contain duplicate values. Remove them if they exists before
// merging.
if !a.ordered() {
a = a.Deduplicate()
}

if !b.ordered() {
b = b.Deduplicate()
}

if a[len(a)-1].UnixNano() < b[0].UnixNano() {
return append(a, b...)
}
35 changes: 34 additions & 1 deletion tsdb/engine/tsm1/encoding_test.go
Original file line number Diff line number Diff line change
@@ -285,6 +285,40 @@ func TestValues_MergeFloat(t *testing.T) {
tsm1.NewValue(2, 2.1),
},
},
{
a: []tsm1.Value{
tsm1.NewValue(0, 0.0),
tsm1.NewValue(1, 1.1),
tsm1.NewValue(2, 2.1),
},
b: []tsm1.Value{
tsm1.NewValue(2, 2.2),
tsm1.NewValue(2, 2.2), // duplicate data
},
exp: []tsm1.Value{
tsm1.NewValue(0, 0.0),
tsm1.NewValue(1, 1.1),
tsm1.NewValue(2, 2.2),
},
},
{
a: []tsm1.Value{
tsm1.NewValue(0, 0.0),
tsm1.NewValue(1, 1.1),
tsm1.NewValue(1, 1.1), // duplicate data
tsm1.NewValue(2, 2.1),
},
b: []tsm1.Value{
tsm1.NewValue(2, 2.2),
tsm1.NewValue(2, 2.2), // duplicate data
},
exp: []tsm1.Value{
tsm1.NewValue(0, 0.0),
tsm1.NewValue(1, 1.1),
tsm1.NewValue(2, 2.2),
},
},

{
a: []tsm1.Value{
tsm1.NewValue(1, 1.1),
@@ -454,7 +488,6 @@ func TestValues_MergeFloat(t *testing.T) {

for i, test := range tests {
got := tsm1.Values(test.a).Merge(test.b)
spew.Dump(got)

if exp, got := len(test.exp), len(got); exp != got {
t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
105 changes: 72 additions & 33 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
@@ -39,17 +39,19 @@ const (

// Statistics gathered by the engine.
const (
statCacheCompactions = "cacheCompactions"
statCacheCompactionError = "cacheCompactionErr"
statCacheCompactionDuration = "cacheCompactionDuration"
statTSMLevel1Compactions = "tsmLevel1Compactions"
statTSMLevel1CompactionDuration = "tsmLevel1CompactionDuration"
statTSMLevel2Compactions = "tsmLevel2Compactions"
statTSMLevel2CompactionDuration = "tsmLevel2CompactionDuration"
statTSMLevel3Compactions = "tsmLevel3Compactions"
statTSMLevel3CompactionDuration = "tsmLevel3CompactionDuration"
statTSMFullCompactions = "tsmFullCompactions"
statTSMFullCompactionDuration = "tsmFullCompactionDuration"
statCacheCompactions = "cacheCompactions"
statCacheCompactionError = "cacheCompactionErr"
statCacheCompactionDuration = "cacheCompactionDuration"
statTSMLevel1Compactions = "tsmLevel1Compactions"
statTSMLevel1CompactionDuration = "tsmLevel1CompactionDuration"
statTSMLevel2Compactions = "tsmLevel2Compactions"
statTSMLevel2CompactionDuration = "tsmLevel2CompactionDuration"
statTSMLevel3Compactions = "tsmLevel3Compactions"
statTSMLevel3CompactionDuration = "tsmLevel3CompactionDuration"
statTSMOptimizeCompactions = "tsmOptimizeCompactions"
statTSMOptimizeCompactionDuration = "tsmOptimizeCompactionDuration"
statTSMFullCompactions = "tsmFullCompactions"
statTSMFullCompactionDuration = "tsmFullCompactionDuration"
)

// Engine represents a storage engine with compressed blocks.
@@ -219,15 +221,18 @@ func (e *Engine) Format() tsdb.EngineFormat {

// EngineStatistics maintains statistics for the engine.
type EngineStatistics struct {
CacheCompactions int64
CacheCompactionErrors int64
CacheCompactionDuration int64
TSMCompactions [3]int64
TSMCompactionErrors [3]int64
TSMCompactionDuration [3]int64
TSMFullCompactions int64
TSMFullCompactionErrors int64
TSMFullCompactionDuration int64
CacheCompactions int64
CacheCompactionErrors int64
CacheCompactionDuration int64
TSMCompactions [3]int64
TSMCompactionErrors [3]int64
TSMCompactionDuration [3]int64
TSMOptimizeCompactions int64
TSMOptimizeCompactionErrors int64
TSMOptimizeCompactionDuration int64
TSMFullCompactions int64
TSMFullCompactionErrors int64
TSMFullCompactionDuration int64
}

// Statistics returns statistics for periodic monitoring.
@@ -875,8 +880,16 @@ func (e *Engine) compactTSMFull() {
return

default:
optimize := false
logDesc := "full"
tsmFiles := e.CompactionPlan.Plan(e.WAL.LastWriteTime())

if len(tsmFiles) == 0 {
optimize = true
logDesc = "optimize"
tsmFiles = e.CompactionPlan.PlanOptimize()
}

if len(tsmFiles) == 0 {
time.Sleep(time.Second)
continue
@@ -891,17 +904,33 @@ func (e *Engine) compactTSMFull() {
go func(groupNum int, group CompactionGroup) {
defer wg.Done()
start := time.Now()
e.logger.Printf("beginning full compaction of group %d, %d TSM files", groupNum, len(group))
e.logger.Printf("beginning %s compaction of group %d, %d TSM files", logDesc, groupNum, len(group))
for i, f := range group {
e.logger.Printf("compacting full group (%d) %s (#%d)", groupNum, f, i)
e.logger.Printf("compacting %s group (%d) %s (#%d)", logDesc, groupNum, f, i)
}

files, err := e.Compactor.CompactFull(group)
if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMFullCompactionErrors, 1)
time.Sleep(time.Second)
return
var (
files []string
err error
)
if optimize {
files, err = e.Compactor.CompactFast(group)
if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMOptimizeCompactionErrors, 1)

time.Sleep(time.Second)
return
}
} else {
files, err = e.Compactor.CompactFull(group)
if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMFullCompactionErrors, 1)

time.Sleep(time.Second)
return
}
}

if err := e.FileStore.Replace(group, files); err != nil {
@@ -912,17 +941,27 @@ func (e *Engine) compactTSMFull() {
}

for i, f := range files {
e.logger.Printf("compacted full group (%d) into %s (#%d)", groupNum, f, i)
e.logger.Printf("compacted %s group (%d) into %s (#%d)", logDesc, groupNum, f, i)
}
atomic.AddInt64(&e.stats.TSMFullCompactions, 1)
e.logger.Printf("compacted full %d files into %d files in %s",
len(group), len(files), time.Since(start))

if optimize {
atomic.AddInt64(&e.stats.TSMOptimizeCompactions, 1)
} else {
atomic.AddInt64(&e.stats.TSMFullCompactions, 1)
}
e.logger.Printf("compacted %s %d files into %d files in %s",
logDesc, len(group), len(files), time.Since(start))
}(i, group)
}
wg.Wait()

// Track the amount of time spent compacting the groups.
atomic.AddInt64(&e.stats.TSMFullCompactionDuration, time.Since(start).Nanoseconds())
if optimize {
atomic.AddInt64(&e.stats.TSMOptimizeCompactionDuration, time.Since(start).Nanoseconds())
} else {
atomic.AddInt64(&e.stats.TSMFullCompactionDuration, time.Since(start).Nanoseconds())
}

}
}
}
1 change: 1 addition & 0 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
@@ -783,6 +783,7 @@ type mockPlanner struct{}

func (m *mockPlanner) Plan(lastWrite time.Time) []tsm1.CompactionGroup { return nil }
func (m *mockPlanner) PlanLevel(level int) []tsm1.CompactionGroup { return nil }
func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return nil }

// ParseTags returns an instance of Tags for a comma-delimited list of key/values.
func ParseTags(s string) influxql.Tags {