Skip to content

Commit

Permalink
Fix panic in full compactions due to duplciate data in blocks
Browse files Browse the repository at this point in the history
Due to a bug in compactions, it's possible some blocks may have duplicate
points stored.  If those blocks are decoded and re-compacted, an assertion
panic could trigger.

We now dedup those blocks if necessary to remove the duplicate points
and avoid the panic.
  • Loading branch information
jwilder committed Jul 2, 2016
1 parent 341e6bb commit 3c04ec5
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 1 deletion.
115 changes: 115 additions & 0 deletions tsdb/engine/tsm1/encoding.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ func (a Values) Size() int {
return sz
}

func (a Values) ordered() bool {
if len(a) <= 1 {
return true
}
for i := 1; i < len(a); i++ {
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
return false
}
}
return true
}

func (a Values) assertOrdered() {
if len(a) <= 1 {
return
Expand Down Expand Up @@ -98,6 +110,17 @@ func (a Values) Merge(b Values) Values {
return a
}

// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
// possible stored blocks might contain duplicate values. Remove them if they exists before
// merging.
if !a.ordered() {
a = a.Deduplicate()
}

if !b.ordered() {
b = b.Deduplicate()
}

if a[len(a)-1].UnixNano() < b[0].UnixNano() {
return append(a, b...)
}
Expand Down Expand Up @@ -178,6 +201,18 @@ func (a FloatValues) Size() int {
return sz
}

func (a FloatValues) ordered() bool {
if len(a) <= 1 {
return true
}
for i := 1; i < len(a); i++ {
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
return false
}
}
return true
}

func (a FloatValues) assertOrdered() {
if len(a) <= 1 {
return
Expand Down Expand Up @@ -246,6 +281,17 @@ func (a FloatValues) Merge(b FloatValues) FloatValues {
return a
}

// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
// possible stored blocks might contain duplicate values. Remove them if they exists before
// merging.
if !a.ordered() {
a = a.Deduplicate()
}

if !b.ordered() {
b = b.Deduplicate()
}

if a[len(a)-1].UnixNano() < b[0].UnixNano() {
return append(a, b...)
}
Expand Down Expand Up @@ -326,6 +372,18 @@ func (a IntegerValues) Size() int {
return sz
}

func (a IntegerValues) ordered() bool {
if len(a) <= 1 {
return true
}
for i := 1; i < len(a); i++ {
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
return false
}
}
return true
}

func (a IntegerValues) assertOrdered() {
if len(a) <= 1 {
return
Expand Down Expand Up @@ -394,6 +452,17 @@ func (a IntegerValues) Merge(b IntegerValues) IntegerValues {
return a
}

// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
// possible stored blocks might contain duplicate values. Remove them if they exists before
// merging.
if !a.ordered() {
a = a.Deduplicate()
}

if !b.ordered() {
b = b.Deduplicate()
}

if a[len(a)-1].UnixNano() < b[0].UnixNano() {
return append(a, b...)
}
Expand Down Expand Up @@ -474,6 +543,18 @@ func (a StringValues) Size() int {
return sz
}

func (a StringValues) ordered() bool {
if len(a) <= 1 {
return true
}
for i := 1; i < len(a); i++ {
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
return false
}
}
return true
}

func (a StringValues) assertOrdered() {
if len(a) <= 1 {
return
Expand Down Expand Up @@ -542,6 +623,17 @@ func (a StringValues) Merge(b StringValues) StringValues {
return a
}

// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
// possible stored blocks might contain duplicate values. Remove them if they exists before
// merging.
if !a.ordered() {
a = a.Deduplicate()
}

if !b.ordered() {
b = b.Deduplicate()
}

if a[len(a)-1].UnixNano() < b[0].UnixNano() {
return append(a, b...)
}
Expand Down Expand Up @@ -622,6 +714,18 @@ func (a BooleanValues) Size() int {
return sz
}

func (a BooleanValues) ordered() bool {
if len(a) <= 1 {
return true
}
for i := 1; i < len(a); i++ {
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
return false
}
}
return true
}

func (a BooleanValues) assertOrdered() {
if len(a) <= 1 {
return
Expand Down Expand Up @@ -690,6 +794,17 @@ func (a BooleanValues) Merge(b BooleanValues) BooleanValues {
return a
}

// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
// possible stored blocks might contain duplicate values. Remove them if they exists before
// merging.
if !a.ordered() {
a = a.Deduplicate()
}

if !b.ordered() {
b = b.Deduplicate()
}

if a[len(a)-1].UnixNano() < b[0].UnixNano() {
return append(a, b...)
}
Expand Down
23 changes: 23 additions & 0 deletions tsdb/engine/tsm1/encoding.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ func (a {{.Name}}Values) Size() int {
return sz
}

func (a {{.Name}}Values) ordered() bool {
if len(a) <= 1 {
return true
}
for i := 1; i < len(a); i++ {
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
return false
}
}
return true
}

func (a {{.Name}}Values) assertOrdered() {
if len(a) <= 1 {
return
Expand Down Expand Up @@ -95,6 +107,17 @@ func (a {{.Name}}Values) Merge(b {{.Name}}Values) {{.Name}}Values {
return a
}

// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
// possible stored blocks might contain duplicate values. Remove them if they exists before
// merging.
if !a.ordered() {
a = a.Deduplicate()
}

if !b.ordered() {
b = b.Deduplicate()
}

if a[len(a)-1].UnixNano() < b[0].UnixNano() {
return append(a, b...)
}
Expand Down
35 changes: 34 additions & 1 deletion tsdb/engine/tsm1/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,40 @@ func TestValues_MergeFloat(t *testing.T) {
tsm1.NewValue(2, 2.1),
},
},
{
a: []tsm1.Value{
tsm1.NewValue(0, 0.0),
tsm1.NewValue(1, 1.1),
tsm1.NewValue(2, 2.1),
},
b: []tsm1.Value{
tsm1.NewValue(2, 2.2),
tsm1.NewValue(2, 2.2), // duplicate data
},
exp: []tsm1.Value{
tsm1.NewValue(0, 0.0),
tsm1.NewValue(1, 1.1),
tsm1.NewValue(2, 2.2),
},
},
{
a: []tsm1.Value{
tsm1.NewValue(0, 0.0),
tsm1.NewValue(1, 1.1),
tsm1.NewValue(1, 1.1), // duplicate data
tsm1.NewValue(2, 2.1),
},
b: []tsm1.Value{
tsm1.NewValue(2, 2.2),
tsm1.NewValue(2, 2.2), // duplicate data
},
exp: []tsm1.Value{
tsm1.NewValue(0, 0.0),
tsm1.NewValue(1, 1.1),
tsm1.NewValue(2, 2.2),
},
},

{
a: []tsm1.Value{
tsm1.NewValue(1, 1.1),
Expand Down Expand Up @@ -454,7 +488,6 @@ func TestValues_MergeFloat(t *testing.T) {

for i, test := range tests {
got := tsm1.Values(test.a).Merge(test.b)
spew.Dump(got)

if exp, got := len(test.exp), len(got); exp != got {
t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
Expand Down

0 comments on commit 3c04ec5

Please sign in to comment.