Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for merge values #6587

Merged
merged 3 commits into from
May 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion services/meta/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,11 +694,19 @@ func (c *Client) DropShard(id uint64) error {

// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
// Check under a read-lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a little more detail explaining the reason for the optimistic check under read lock would be worthwhile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateShardGroup is actually CreateShardGroupIfNotExists and shard groups are checked for existence much more frequently than they are created. The write lock showed up in the block profile when writing points w/ many fields.

c.mu.RLock()
if sg, _ := c.cacheData.ShardGroupByTimestamp(database, policy, timestamp); sg != nil {
c.mu.RUnlock()
return sg, nil
}
c.mu.RUnlock()

c.mu.Lock()
defer c.mu.Unlock()

// Check again under the write lock
data := c.cacheData.Clone()

if sg, _ := data.ShardGroupByTimestamp(database, policy, timestamp); sg != nil {
return sg, nil
}
Expand Down
227 changes: 165 additions & 62 deletions tsdb/engine/tsm1/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,26 +111,45 @@ func (a Values) Merge(b Values) Values {
return a
}

var i, j int
for ; i < len(a) && j < len(b); i++ {
av, bv := a[i].UnixNano(), b[j].UnixNano()
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
a[i], b[j] = b[j], a[i]
// Save value in a
temp := a[i]

// Overwrite a with b
a[i] = b[0]

// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought, but is it worth waiting until we know what to do with the merge, before we reduce the length of b?
Something like this maybe?

copy(b, b[1:])

// See where.....
k := sort.Search(len(b)-1, func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })

// b is currently 1 element longer than possibly needed.
if k == len(b)-1 {
    b[k] = temp
} else if .... {
    last := b[len(b)-2]
    copy(b[k+1:], b[k:])
    b[len(b)-1] = last
    b[k] = temp
} else {
    // Reduce the length of b.
    b = b[:len(b)-1]
}

Just a thought, I've not tested it or anything! Could reduce some allocations here though if the logic makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't actually allocate in this loop. It's just updating the slice length and shifting values. The underlying storage does not change until L151 where it need to append the rest of b onto a.

I pushed up a benchmark to verify.

$ go test -run non -bench BenchmarkValues_Merge -benchmem -v ./tsdb/engine/tsm1/
PASS
BenchmarkValues_Merge-8   100000         16167 ns/op       32769 B/op          1 allocs/op

Removing the append on L151

$ go test -run non -bench BenchmarkValues_Merge -benchmem -v ./tsdb/engine/tsm1/
PASS
BenchmarkValues_Merge-8   300000          5178 ns/op           0 B/op          0 allocs/op

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh of course. There will always be the extra capacity needed to avoid the allocation...


// See where value we save from a should be inserted in b to keep b sorted
k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })

if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
a[i] = b[j]
j++
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
}
}

sort.Sort(b[j:])

if i >= len(a) {
if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() {
j++
}
return append(a, b[j:]...)
if len(b) > 0 {
return append(a, b...)
}

return a
}

Expand Down Expand Up @@ -442,24 +461,45 @@ func (a FloatValues) Merge(b FloatValues) FloatValues {
return a
}

var i, j int
for ; i < len(a) && j < len(b); i++ {
av, bv := a[i].UnixNano(), b[j].UnixNano()
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
a[i], b[j] = b[j], a[i]
// Save value in a
temp := a[i]

// Overwrite a with b
a[i] = b[0]

// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]

// See where value we save from a should be inserted in b to keep b sorted
k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })

if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
a[i] = b[j]
j++
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
}
}

if i >= len(a) {
if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() {
j++
}
return append(a, b[j:]...)
if len(b) > 0 {
return append(a, b...)
}

return a
}

Expand Down Expand Up @@ -655,24 +695,45 @@ func (a BooleanValues) Merge(b BooleanValues) BooleanValues {
return a
}

var i, j int
for ; i < len(a) && j < len(b); i++ {
av, bv := a[i].UnixNano(), b[j].UnixNano()
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
a[i], b[j] = b[j], a[i]
// Save value in a
temp := a[i]

// Overwrite a with b
a[i] = b[0]

// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]

// See where value we save from a should be inserted in b to keep b sorted
k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })

if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
a[i] = b[j]
j++
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
}
}

if i >= len(a) {
if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() {
j++
}
return append(a, b[j:]...)
if len(b) > 0 {
return append(a, b...)
}

return a
}

Expand Down Expand Up @@ -828,24 +889,45 @@ func (a IntegerValues) Merge(b IntegerValues) IntegerValues {
return a
}

var i, j int
for ; i < len(a) && j < len(b); i++ {
av, bv := a[i].UnixNano(), b[j].UnixNano()
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
a[i], b[j] = b[j], a[i]
// Save value in a
temp := a[i]

// Overwrite a with b
a[i] = b[0]

// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]

// See where value we save from a should be inserted in b to keep b sorted
k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })

if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
a[i] = b[j]
j++
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
}
}

if i >= len(a) {
if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() {
j++
}
return append(a, b[j:]...)
if len(b) > 0 {
return append(a, b...)
}

return a
}

Expand Down Expand Up @@ -1003,24 +1085,45 @@ func (a StringValues) Merge(b StringValues) StringValues {
return a
}

var i, j int
for ; i < len(a) && j < len(b); i++ {
av, bv := a[i].UnixNano(), b[j].UnixNano()
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
a[i], b[j] = b[j], a[i]
// Save value in a
temp := a[i]

// Overwrite a with b
a[i] = b[0]

// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]

// See where value we save from a should be inserted in b to keep b sorted
k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })

if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
a[i] = b[j]
j++
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
}
}

if i >= len(a) {
if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() {
j++
}
return append(a, b[j:]...)
if len(b) > 0 {
return append(a, b...)
}

return a
}

Expand Down
Loading