Skip to content

Commit

Permalink
intervalccl: optimize OverlapCoveringMerge
Browse files Browse the repository at this point in the history
Change OverlapCoveringMerge instead of iterating over coverings for each
input range to flatten coverings, sort results and produce ranges with
one pass. Reducing total complexity from O(nm) to O(n*log(n)).

Signed-off-by: Artem Barger <bartem@il.ibm.com>

Release note: none
  • Loading branch information
C0rWin committed Feb 27, 2020
1 parent eae5a92 commit 1a99f0f
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 85 deletions.
94 changes: 94 additions & 0 deletions pkg/sql/covering/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package covering

import (
"encoding/binary"
"math/rand"
"testing"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

func BenchmarkOverlapCoveringMerge(b *testing.B) {

benchmarks := []struct {
// benchmark test
name string
// rate we would like to "simulate" backups
backupRate int
// period which backups is going to cover during benchmark
backupPeriod int
// amount of coverings (ranges) we going to have per each period
coveringsPerPeriod int
}{
{
name: "OneDaysHourlyBackup_10000Coverings",
backupRate: 1,
backupPeriod: 1,
coveringsPerPeriod: 10000,
},
{
name: "TwoDaysHourlyBackup_10000Coverings",
backupRate: 1,
backupPeriod: 2,
coveringsPerPeriod: 10000,
},
{
name: "WeekHourlyBackup_10000Coverings",
backupRate: 1,
backupPeriod: 7,
coveringsPerPeriod: 10000,
},
}

for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
// Prepare benchmark input
b.StopTimer()
var inputs []Covering
rand.Seed(timeutil.Now().Unix())
backupsRate := 1 // hourly
backupPeriod := 2 // two days
coveringsPerPeriod := 10000

for i := 0; i < 24*backupsRate*backupPeriod; i++ {
var payload int
var c Covering
step := 1 + rand.Intn(10)

for j := 0; j < coveringsPerPeriod; j += step {
start := make([]byte, 4)
binary.LittleEndian.PutUint32(start, uint32(j))

end := make([]byte, 4)
binary.LittleEndian.PutUint32(end, uint32(j+step))

c = append(c, Range{
Start: start,
End: end,
Payload: payload,
})
payload++
}

inputs = append(inputs, c)
}

// Having input prepared next we run the benchmark itself
b.StartTimer()

for i := 0; i < b.N; i++ {
OverlapCoveringMerge(inputs)
}
})
}
}
216 changes: 131 additions & 85 deletions pkg/sql/covering/overlap_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package covering

import (
"bytes"
"reflect"
"sort"
)

Expand All @@ -22,21 +23,51 @@ type Range struct {
Payload interface{}
}

// Covering represents a non-overlapping, but possibly non-contiguous, set of
// intervals.
type Covering []Range
// endpoints collections of all start and endpoints
type endpoints [][]byte

var _ sort.Interface = Covering{}
var _ sort.Interface = endpoints{}

func (c Covering) Len() int { return len(c) }
func (c Covering) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c Covering) Less(i, j int) bool {
if cmp := bytes.Compare(c[i].Start, c[j].Start); cmp != 0 {
return cmp < 0
}
return bytes.Compare(c[i].End, c[j].End) < 0
func (e endpoints) Len() int {
return len(e)
}

func (e endpoints) Less(i, j int) bool {
return bytes.Compare(e[i], e[j]) < 0
}

func (e endpoints) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}

// marker is a wrapper for payload and cover index in order to be able to
// append payloads based on the order among different covers
type marker struct {
payload interface{}
coveringIndex int
}

type markers []marker

var _ sort.Interface = markers{}

func (m markers) Len() int {
return len(m)
}

func (m markers) Less(i, j int) bool {
// sort markers based on the cover index it appears
return m[i].coveringIndex < m[j].coveringIndex
}

func (m markers) Swap(i, j int) {
m[i], m[j] = m[j], m[i]
}

// Covering represents a non-overlapping, but possibly non-contiguous, set of
// intervals.
type Covering []Range

// OverlapCoveringMerge returns the set of intervals covering every range in the
// input such that no output range crosses an input endpoint. The payloads are
// returned as a `[]interface{}` and in the same order as they are in coverings.
Expand All @@ -49,97 +80,112 @@ func (c Covering) Less(i, j int) bool {
// The input is mutated (sorted). It is also assumed (and not checked) to be
// valid (e.g. non-overlapping intervals in each covering).
func OverlapCoveringMerge(coverings []Covering) []Range {
for _, covering := range coverings {
sort.Sort(covering)
}

// TODO(dan): Verify that the ranges in each covering are non-overlapping.

// Each covering is now sorted. Repeatedly iterate through the first range
// in each covering to find the next output range. Then remove the ranges
// that have been fully represented in the output from the front of each
// covering.
// We would like to flatten all coverings on the the single line. Assume that total
// amount of coverings is N, then overall there are 2*N endpoints. Flatten coverings
// on single line and sort them will take O(NlogN) time. Later we pass over endpoints
// one by one and append payloads.

// captures all endpoints (starts and ends from) collected from all coverings
var totalRange endpoints
//
// TODO(dan): This is O(number of coverings * total number of input ranges).
// The number of ranges in the output is O(total number of input ranges) and
// each has a payload that is O(number of coverings), so we can't do any
// better without changing the output representation. That said, constants
// matter so if this ever turns out to be too slow, we could sort all start
// and end points (each point becomes an "event" to either start or end a
// range) and scan them in order, maintaining a list of intervals that are
// currently "open"
var ret []Range
var previousEndKey []byte
for {
// Find the start key of the next range. It will either be the end key
// of the range just added to the output or the minimum start key
// remaining in the coverings (if there is a gap).
var startKey []byte
startKeySet := false
for _, covering := range coverings {
if len(covering) == 0 {
continue
numsMap := map[string]struct{}{}
// map to store an empty sets, need to distinguish
// cause in such case start and end are equals, hence
// need to prevent duplications
emptySets := map[string]struct{}{}
// captures all start covering's endpoints with relevant payloads
startKeys := map[string]markers{}
// captures all end covering's endpoints with relevant payloads
endKeys := map[string]markers{}

for i, covering := range coverings {
for _, r := range covering {
startKeys[string(r.Start)] = append(startKeys[string(r.Start)], marker{
payload: r.Payload,
coveringIndex: i,
})
if _, exist := numsMap[string(r.Start)]; !exist {
totalRange = append(totalRange, r.Start)
numsMap[string(r.Start)] = struct{}{}
}
if !startKeySet || bytes.Compare(covering[0].Start, startKey) < 0 {
startKey = covering[0].Start
startKeySet = true
endKeys[string(r.End)] = append(endKeys[string(r.End)], marker{
payload: r.Payload,
coveringIndex: i,
})

if _, exist := numsMap[string(r.End)]; !exist {
totalRange = append(totalRange, r.End)
numsMap[string(r.End)] = struct{}{}
}
}
if !startKeySet {
break
}
if bytes.Compare(startKey, previousEndKey) < 0 {
startKey = previousEndKey
}

// Find the end key of the next range. It's the minimum of all end keys
// of ranges that intersect the start and all start keys of ranges after
// the end key of the range just added to the output.
var endKey []byte
endKeySet := false
for _, covering := range coverings {
if len(covering) == 0 {
// if start and end differs then it's normal interval and
// we can continue to the next one
if !bytes.Equal(r.Start, r.End) {
continue
}

if bytes.Compare(covering[0].Start, startKey) > 0 {
if !endKeySet || bytes.Compare(covering[0].Start, endKey) < 0 {
endKey = covering[0].Start
endKeySet = true
}
// otherwise it is an empty interval and we need to remember it
if _, exists := emptySets[string(r.Start)]; !exists {
totalRange = append(totalRange, r.End)
emptySets[string(r.Start)] = struct{}{}
}
if !endKeySet || bytes.Compare(covering[0].End, endKey) < 0 {
endKey = covering[0].End
endKeySet = true
}
}
sort.Sort(totalRange)

var prev []byte
var payloadsMarkers markers
var ret []Range

for it, next := range totalRange {
if len(prev) != 0 && len(payloadsMarkers) > 0 {
var payloads []interface{}
// make sure we preserve order of covers as we got them
sort.Sort(payloadsMarkers)

for _, marker := range payloadsMarkers {
payloads = append(payloads, marker.payload)
}

ret = append(ret, Range{
Start: prev,
End: next,
Payload: payloads,
})
}

// Collect all payloads of ranges that intersect the start and end keys
// just selected. Also trim any ranges with an end key <= the one just
// selected, they will not be output after this.
var payloads []interface{}
for i := range coverings {
// Because of how we chose startKey and endKey, we know that
// coverings[i][0].End >= endKey and that coverings[i][0].Start is
// either <= startKey or >= endKey.

for len(coverings[i]) > 0 {
if bytes.Compare(coverings[i][0].Start, startKey) > 0 {
break
if removeMarkers, ok := endKeys[string(next)]; ok {
for _, marker := range removeMarkers {
var index = -1
for i, p := range payloadsMarkers {
if reflect.DeepEqual(p.payload, marker.payload) {
index = i
break
}
}
payloads = append(payloads, coverings[i][0].Payload)
if !bytes.Equal(coverings[i][0].End, endKey) {
break
if index != -1 { // if found remove
payloadsMarkers = append(payloadsMarkers[:index], payloadsMarkers[index+1:]...)
}
coverings[i] = coverings[i][1:]
}
}

ret = append(ret, Range{
Start: startKey,
End: endKey,
Payload: payloads,
})
previousEndKey = endKey
// we hit empty set, no need to add anything, since it was
// already added during previous iteration
if bytes.Equal(prev, next) && it > 0 {
// the check for it > 0 needed to take care
// of the case where start and end of an empty
// set presented with zero length slice, since
// this is a value prev and next both are initialized.
continue
}

if addMarkers, ok := startKeys[string(next)]; ok {
payloadsMarkers = append(payloadsMarkers, addMarkers...)
}

prev = next
}

return ret
Expand Down

0 comments on commit 1a99f0f

Please sign in to comment.