Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

intervalccl: optimize OverlapCoveringMerge #45139

Merged
merged 1 commit into from
Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions pkg/sql/covering/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package covering

import (
"encoding/binary"
"fmt"
"math/rand"
"testing"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

func BenchmarkOverlapCoveringMerge(b *testing.B) {
var benchmark []struct {
name string
inputs []Covering
}
rand.Seed(timeutil.Now().Unix())

for _, numLayers := range []int{
1, // single backup
24, // hourly backups
24 * 7, // hourly backups for a week.
} {
// number of elements per each backup instance
for _, elementsPerLayer := range []int{100, 1000, 10000} {
var inputs []Covering

for i := 0; i < numLayers; i++ {
var payload int
var c Covering
step := 1 + rand.Intn(10)

for j := 0; j < elementsPerLayer; j += step {
start := make([]byte, 4)
binary.LittleEndian.PutUint32(start, uint32(j))

end := make([]byte, 4)
binary.LittleEndian.PutUint32(end, uint32(j+step))

c = append(c, Range{
Start: start,
End: end,
Payload: payload,
})
payload++
}
inputs = append(inputs, c)
}

benchmark = append(benchmark, struct {
name string
inputs []Covering
}{name: fmt.Sprintf("layers=%d,elems=%d", numLayers, elementsPerLayer), inputs: inputs})
}
}

b.ResetTimer()
for _, bench := range benchmark {
inputs := bench.inputs
b.Run(bench.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
require.NotEmpty(b, OverlapCoveringMerge(inputs))
}
})
}
}
216 changes: 131 additions & 85 deletions pkg/sql/covering/overlap_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package covering

import (
"bytes"
"reflect"
"sort"
)

Expand All @@ -22,21 +23,51 @@ type Range struct {
Payload interface{}
}

// Covering represents a non-overlapping, but possibly non-contiguous, set of
// intervals.
type Covering []Range
// endpoints collections of all start and endpoints
type endpoints [][]byte

var _ sort.Interface = Covering{}
var _ sort.Interface = endpoints{}

func (c Covering) Len() int { return len(c) }
func (c Covering) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c Covering) Less(i, j int) bool {
if cmp := bytes.Compare(c[i].Start, c[j].Start); cmp != 0 {
return cmp < 0
}
return bytes.Compare(c[i].End, c[j].End) < 0
func (e endpoints) Len() int {
return len(e)
}

func (e endpoints) Less(i, j int) bool {
return bytes.Compare(e[i], e[j]) < 0
}

func (e endpoints) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}

// marker is a wrapper for payload and cover index in order to be able to
// append payloads based on the order among different covers
type marker struct {
payload interface{}
coveringIndex int
}

type markers []marker

var _ sort.Interface = markers{}

func (m markers) Len() int {
return len(m)
}

func (m markers) Less(i, j int) bool {
// sort markers based on the cover index it appears
return m[i].coveringIndex < m[j].coveringIndex
}

func (m markers) Swap(i, j int) {
m[i], m[j] = m[j], m[i]
}

// Covering represents a non-overlapping, but possibly non-contiguous, set of
// intervals.
type Covering []Range

// OverlapCoveringMerge returns the set of intervals covering every range in the
// input such that no output range crosses an input endpoint. The payloads are
// returned as a `[]interface{}` and in the same order as they are in coverings.
Expand All @@ -49,97 +80,112 @@ func (c Covering) Less(i, j int) bool {
// The input is mutated (sorted). It is also assumed (and not checked) to be
// valid (e.g. non-overlapping intervals in each covering).
func OverlapCoveringMerge(coverings []Covering) []Range {
for _, covering := range coverings {
sort.Sort(covering)
}

// TODO(dan): Verify that the ranges in each covering are non-overlapping.

// Each covering is now sorted. Repeatedly iterate through the first range
// in each covering to find the next output range. Then remove the ranges
// that have been fully represented in the output from the front of each
// covering.
// We would like to flatten all coverings on the the single line. Assume that total
// amount of coverings is N, then overall there are 2*N endpoints. Flatten coverings
// on single line and sort them will take O(NlogN) time. Later we pass over endpoints
// one by one and append payloads.

// captures all endpoints (starts and ends from) collected from all coverings
var totalRange endpoints
//
// TODO(dan): This is O(number of coverings * total number of input ranges).
// The number of ranges in the output is O(total number of input ranges) and
// each has a payload that is O(number of coverings), so we can't do any
// better without changing the output representation. That said, constants
// matter so if this ever turns out to be too slow, we could sort all start
// and end points (each point becomes an "event" to either start or end a
// range) and scan them in order, maintaining a list of intervals that are
// currently "open"
var ret []Range
var previousEndKey []byte
for {
// Find the start key of the next range. It will either be the end key
// of the range just added to the output or the minimum start key
// remaining in the coverings (if there is a gap).
var startKey []byte
startKeySet := false
for _, covering := range coverings {
if len(covering) == 0 {
continue
numsMap := map[string]struct{}{}
// map to store an empty sets, need to distinguish
// cause in such case start and end are equals, hence
// need to prevent duplications
emptySets := map[string]struct{}{}
// captures all start covering's endpoints with relevant payloads
startKeys := map[string]markers{}
// captures all end covering's endpoints with relevant payloads
endKeys := map[string]markers{}

for i, covering := range coverings {
for _, r := range covering {
startKeys[string(r.Start)] = append(startKeys[string(r.Start)], marker{
payload: r.Payload,
coveringIndex: i,
})
if _, exist := numsMap[string(r.Start)]; !exist {
totalRange = append(totalRange, r.Start)
numsMap[string(r.Start)] = struct{}{}
}
if !startKeySet || bytes.Compare(covering[0].Start, startKey) < 0 {
startKey = covering[0].Start
startKeySet = true
endKeys[string(r.End)] = append(endKeys[string(r.End)], marker{
payload: r.Payload,
coveringIndex: i,
})

if _, exist := numsMap[string(r.End)]; !exist {
totalRange = append(totalRange, r.End)
numsMap[string(r.End)] = struct{}{}
}
}
if !startKeySet {
break
}
if bytes.Compare(startKey, previousEndKey) < 0 {
startKey = previousEndKey
}

// Find the end key of the next range. It's the minimum of all end keys
// of ranges that intersect the start and all start keys of ranges after
// the end key of the range just added to the output.
var endKey []byte
endKeySet := false
for _, covering := range coverings {
if len(covering) == 0 {
// if start and end differs then it's normal interval and
// we can continue to the next one
if !bytes.Equal(r.Start, r.End) {
continue
}

if bytes.Compare(covering[0].Start, startKey) > 0 {
if !endKeySet || bytes.Compare(covering[0].Start, endKey) < 0 {
endKey = covering[0].Start
endKeySet = true
}
// otherwise it is an empty interval and we need to remember it
if _, exists := emptySets[string(r.Start)]; !exists {
totalRange = append(totalRange, r.End)
emptySets[string(r.Start)] = struct{}{}
}
if !endKeySet || bytes.Compare(covering[0].End, endKey) < 0 {
endKey = covering[0].End
endKeySet = true
}
}
sort.Sort(totalRange)

var prev []byte
var payloadsMarkers markers
var ret []Range

for it, next := range totalRange {
if len(prev) != 0 && len(payloadsMarkers) > 0 {
var payloads []interface{}
// make sure we preserve order of covers as we got them
sort.Sort(payloadsMarkers)

for _, marker := range payloadsMarkers {
payloads = append(payloads, marker.payload)
}

ret = append(ret, Range{
Start: prev,
End: next,
Payload: payloads,
})
}

// Collect all payloads of ranges that intersect the start and end keys
// just selected. Also trim any ranges with an end key <= the one just
// selected, they will not be output after this.
var payloads []interface{}
for i := range coverings {
// Because of how we chose startKey and endKey, we know that
// coverings[i][0].End >= endKey and that coverings[i][0].Start is
// either <= startKey or >= endKey.

for len(coverings[i]) > 0 {
if bytes.Compare(coverings[i][0].Start, startKey) > 0 {
break
if removeMarkers, ok := endKeys[string(next)]; ok {
for _, marker := range removeMarkers {
var index = -1
for i, p := range payloadsMarkers {
if reflect.DeepEqual(p.payload, marker.payload) {
index = i
break
}
}
payloads = append(payloads, coverings[i][0].Payload)
if !bytes.Equal(coverings[i][0].End, endKey) {
break
if index != -1 { // if found remove
payloadsMarkers = append(payloadsMarkers[:index], payloadsMarkers[index+1:]...)
}
coverings[i] = coverings[i][1:]
}
}

ret = append(ret, Range{
Start: startKey,
End: endKey,
Payload: payloads,
})
previousEndKey = endKey
// we hit empty set, no need to add anything, since it was
// already added during previous iteration
if bytes.Equal(prev, next) && it > 0 {
// the check for it > 0 needed to take care
// of the case where start and end of an empty
// set presented with zero length slice, since
// this is a value prev and next both are initialized.
continue
}

if addMarkers, ok := startKeys[string(next)]; ok {
payloadsMarkers = append(payloadsMarkers, addMarkers...)
}

prev = next
}

return ret
Expand Down