diff --git a/services/meta/client.go b/services/meta/client.go index 0ddf966e502..8dad1d6732a 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -694,11 +694,19 @@ func (c *Client) DropShard(id uint64) error { // CreateShardGroup creates a shard group on a database and policy for a given timestamp. func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) { + // Check under a read-lock + c.mu.RLock() + if sg, _ := c.cacheData.ShardGroupByTimestamp(database, policy, timestamp); sg != nil { + c.mu.RUnlock() + return sg, nil + } + c.mu.RUnlock() + c.mu.Lock() defer c.mu.Unlock() + // Check again under the write lock data := c.cacheData.Clone() - if sg, _ := data.ShardGroupByTimestamp(database, policy, timestamp); sg != nil { return sg, nil } diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index 798f163a041..4ad345545ea 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -111,26 +111,45 @@ func (a Values) Merge(b Values) Values { return a } - var i, j int - for ; i < len(a) && j < len(b); i++ { - av, bv := a[i].UnixNano(), b[j].UnixNano() + for i := 0; i < len(a) && len(b) > 0; i++ { + av, bv := a[i].UnixNano(), b[0].UnixNano() + // Value in a is greater than B, we need to merge if av > bv { - a[i], b[j] = b[j], a[i] + // Save value in a + temp := a[i] + + // Overwrite a with b + a[i] = b[0] + + // Slide all values of b down 1 + copy(b, b[1:]) + b = b[:len(b)-1] + + // See where value we save from a should be inserted in b to keep b sorted + k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) + + if k == len(b) { + // Last position? + b = append(b, temp) + } else if b[k].UnixNano() != temp.UnixNano() { + // Save the last element, since it will get overwritten + last := b[len(b)-1] + // Somewhere in the middle of b, insert it only if it's not a duplicate + copy(b[k+1:], b[k:]) + // Add the last vale to the end + b = append(b, last) + b[k] = temp + } } else if av == bv { - a[i] = b[j] - j++ + // Value in a an b are the same, use b + a[i] = b[0] + b = b[1:] } } - sort.Sort(b[j:]) - - if i >= len(a) { - if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() { - j++ - } - return append(a, b[j:]...) + if len(b) > 0 { + return append(a, b...) } - return a } @@ -442,24 +461,45 @@ func (a FloatValues) Merge(b FloatValues) FloatValues { return a } - var i, j int - for ; i < len(a) && j < len(b); i++ { - av, bv := a[i].UnixNano(), b[j].UnixNano() + for i := 0; i < len(a) && len(b) > 0; i++ { + av, bv := a[i].UnixNano(), b[0].UnixNano() + // Value in a is greater than B, we need to merge if av > bv { - a[i], b[j] = b[j], a[i] + // Save value in a + temp := a[i] + + // Overwrite a with b + a[i] = b[0] + + // Slide all values of b down 1 + copy(b, b[1:]) + b = b[:len(b)-1] + + // See where value we save from a should be inserted in b to keep b sorted + k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) + + if k == len(b) { + // Last position? + b = append(b, temp) + } else if b[k].UnixNano() != temp.UnixNano() { + // Save the last element, since it will get overwritten + last := b[len(b)-1] + // Somewhere in the middle of b, insert it only if it's not a duplicate + copy(b[k+1:], b[k:]) + // Add the last vale to the end + b = append(b, last) + b[k] = temp + } } else if av == bv { - a[i] = b[j] - j++ + // Value in a an b are the same, use b + a[i] = b[0] + b = b[1:] } } - if i >= len(a) { - if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() { - j++ - } - return append(a, b[j:]...) + if len(b) > 0 { + return append(a, b...) } - return a } @@ -655,24 +695,45 @@ func (a BooleanValues) Merge(b BooleanValues) BooleanValues { return a } - var i, j int - for ; i < len(a) && j < len(b); i++ { - av, bv := a[i].UnixNano(), b[j].UnixNano() + for i := 0; i < len(a) && len(b) > 0; i++ { + av, bv := a[i].UnixNano(), b[0].UnixNano() + // Value in a is greater than B, we need to merge if av > bv { - a[i], b[j] = b[j], a[i] + // Save value in a + temp := a[i] + + // Overwrite a with b + a[i] = b[0] + + // Slide all values of b down 1 + copy(b, b[1:]) + b = b[:len(b)-1] + + // See where value we save from a should be inserted in b to keep b sorted + k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) + + if k == len(b) { + // Last position? + b = append(b, temp) + } else if b[k].UnixNano() != temp.UnixNano() { + // Save the last element, since it will get overwritten + last := b[len(b)-1] + // Somewhere in the middle of b, insert it only if it's not a duplicate + copy(b[k+1:], b[k:]) + // Add the last vale to the end + b = append(b, last) + b[k] = temp + } } else if av == bv { - a[i] = b[j] - j++ + // Value in a an b are the same, use b + a[i] = b[0] + b = b[1:] } } - if i >= len(a) { - if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() { - j++ - } - return append(a, b[j:]...) + if len(b) > 0 { + return append(a, b...) } - return a } @@ -828,24 +889,45 @@ func (a IntegerValues) Merge(b IntegerValues) IntegerValues { return a } - var i, j int - for ; i < len(a) && j < len(b); i++ { - av, bv := a[i].UnixNano(), b[j].UnixNano() + for i := 0; i < len(a) && len(b) > 0; i++ { + av, bv := a[i].UnixNano(), b[0].UnixNano() + // Value in a is greater than B, we need to merge if av > bv { - a[i], b[j] = b[j], a[i] + // Save value in a + temp := a[i] + + // Overwrite a with b + a[i] = b[0] + + // Slide all values of b down 1 + copy(b, b[1:]) + b = b[:len(b)-1] + + // See where value we save from a should be inserted in b to keep b sorted + k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) + + if k == len(b) { + // Last position? + b = append(b, temp) + } else if b[k].UnixNano() != temp.UnixNano() { + // Save the last element, since it will get overwritten + last := b[len(b)-1] + // Somewhere in the middle of b, insert it only if it's not a duplicate + copy(b[k+1:], b[k:]) + // Add the last vale to the end + b = append(b, last) + b[k] = temp + } } else if av == bv { - a[i] = b[j] - j++ + // Value in a an b are the same, use b + a[i] = b[0] + b = b[1:] } } - if i >= len(a) { - if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() { - j++ - } - return append(a, b[j:]...) + if len(b) > 0 { + return append(a, b...) } - return a } @@ -1003,24 +1085,45 @@ func (a StringValues) Merge(b StringValues) StringValues { return a } - var i, j int - for ; i < len(a) && j < len(b); i++ { - av, bv := a[i].UnixNano(), b[j].UnixNano() + for i := 0; i < len(a) && len(b) > 0; i++ { + av, bv := a[i].UnixNano(), b[0].UnixNano() + // Value in a is greater than B, we need to merge if av > bv { - a[i], b[j] = b[j], a[i] + // Save value in a + temp := a[i] + + // Overwrite a with b + a[i] = b[0] + + // Slide all values of b down 1 + copy(b, b[1:]) + b = b[:len(b)-1] + + // See where value we save from a should be inserted in b to keep b sorted + k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) + + if k == len(b) { + // Last position? + b = append(b, temp) + } else if b[k].UnixNano() != temp.UnixNano() { + // Save the last element, since it will get overwritten + last := b[len(b)-1] + // Somewhere in the middle of b, insert it only if it's not a duplicate + copy(b[k+1:], b[k:]) + // Add the last vale to the end + b = append(b, last) + b[k] = temp + } } else if av == bv { - a[i] = b[j] - j++ + // Value in a an b are the same, use b + a[i] = b[0] + b = b[1:] } } - if i >= len(a) { - if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() { - j++ - } - return append(a, b[j:]...) + if len(b) > 0 { + return append(a, b...) } - return a } diff --git a/tsdb/engine/tsm1/encoding_test.go b/tsdb/engine/tsm1/encoding_test.go index fbe01280357..3c63b9bf59a 100644 --- a/tsdb/engine/tsm1/encoding_test.go +++ b/tsdb/engine/tsm1/encoding_test.go @@ -386,6 +386,70 @@ func TestValues_MergeFloat(t *testing.T) { tsm1.NewValue(1462498658288956853, 1.1), }, }, + { + a: []tsm1.Value{ + tsm1.NewValue(4, 4.0), + tsm1.NewValue(5, 5.0), + tsm1.NewValue(6, 6.0), + }, + b: []tsm1.Value{ + tsm1.NewValue(1, 1.0), + tsm1.NewValue(2, 2.0), + tsm1.NewValue(3, 3.0), + }, + exp: []tsm1.Value{ + tsm1.NewValue(1, 1.0), + tsm1.NewValue(2, 2.0), + tsm1.NewValue(3, 3.0), + tsm1.NewValue(4, 4.0), + tsm1.NewValue(5, 5.0), + tsm1.NewValue(6, 6.0), + }, + }, + { + a: []tsm1.Value{ + tsm1.NewValue(5, 5.0), + tsm1.NewValue(6, 6.0), + }, + b: []tsm1.Value{ + tsm1.NewValue(1, 1.0), + tsm1.NewValue(2, 2.0), + tsm1.NewValue(3, 3.0), + tsm1.NewValue(4, 4.0), + tsm1.NewValue(7, 7.0), + tsm1.NewValue(8, 8.0), + }, + exp: []tsm1.Value{ + tsm1.NewValue(1, 1.0), + tsm1.NewValue(2, 2.0), + tsm1.NewValue(3, 3.0), + tsm1.NewValue(4, 4.0), + tsm1.NewValue(5, 5.0), + tsm1.NewValue(6, 6.0), + tsm1.NewValue(7, 7.0), + tsm1.NewValue(8, 8.0), + }, + }, + { + a: []tsm1.Value{ + tsm1.NewValue(1, 1.0), + tsm1.NewValue(2, 2.0), + tsm1.NewValue(3, 3.0), + }, + b: []tsm1.Value{ + tsm1.NewValue(4, 4.0), + tsm1.NewValue(5, 5.0), + tsm1.NewValue(6, 6.0), + }, + exp: []tsm1.Value{ + tsm1.NewValue(1, 1.0), + tsm1.NewValue(2, 2.0), + tsm1.NewValue(3, 3.0), + tsm1.NewValue(4, 4.0), + tsm1.NewValue(5, 5.0), + tsm1.NewValue(6, 6.0), + }, + }, } for i, test := range tests { @@ -1226,3 +1290,20 @@ func BenchmarkValues_Deduplicate(b *testing.B) { tsm1.Values(values).Deduplicate() } } + +func BenchmarkValues_Merge(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + a := make([]tsm1.Value, len(times)) + c := make([]tsm1.Value, len(times)) + + for i, t := range times { + a[i] = tsm1.NewValue(t, float64(i)) + c[i] = tsm1.NewValue(t+1, float64(i)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + tsm1.Values(a).Merge(c) + } +}