Skip to content

Commit

Permalink
refactor: optimize computeWindowShards
Browse files Browse the repository at this point in the history
  • Loading branch information
novahe committed May 14, 2022
1 parent 5ce4113 commit a7a4866
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 28 deletions.
49 changes: 21 additions & 28 deletions pkg/providers/prom/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,38 +347,31 @@ func (c *context) computeShards(query string, window *promapiv1.Range) *QuerySha
}
}

func ComputeWindowShards(window *promapiv1.Range, MaxPointsLimitPerTimeSeries int) []*promapiv1.Range {
possiblePoints := 0
for start := window.Start; start.Before(window.End) || start.Equal(window.End); start = start.Add(window.Step) {
possiblePoints++
}
var shardsNum int
if possiblePoints%MaxPointsLimitPerTimeSeries > 0 {
shardsNum = possiblePoints/MaxPointsLimitPerTimeSeries + 1
} else {
shardsNum = possiblePoints / MaxPointsLimitPerTimeSeries
}

s := window.Start
e := window.Start
func ComputeWindowShards(window *promapiv1.Range, maxPointsLimitPerTimeSeries int) []*promapiv1.Range {
shardIndex := 0
nextPoint := window.Start
prePoint := nextPoint
var shards []*promapiv1.Range
// assume the bound is a point
for i := 0; i < shardsNum; i++ {
width := time.Duration(MaxPointsLimitPerTimeSeries-1) * window.Step
e = e.Add(width)
if e.After(window.End) {
e = window.End
for {
if nextPoint.After(window.End) {
shards = append(shards, &promapiv1.Range{
Start: prePoint,
End: window.End,
Step: window.Step,
})
return shards
}
shardWindow := &promapiv1.Range{
Step: window.Step,
Start: s,
End: e,
if shardIndex != 0 && shardIndex%maxPointsLimitPerTimeSeries == 0 {
shards = append(shards, &promapiv1.Range{
Start: prePoint,
End: nextPoint.Add(-window.Step),
Step: window.Step,
})
prePoint = nextPoint
}
shards = append(shards, shardWindow)
// reset
s = e
nextPoint = nextPoint.Add(window.Step)
shardIndex++
}
return shards
}

// shard by time slice only, because we can not decide what the query is, how many time series it will return, it depends on the application level.
Expand Down
133 changes: 133 additions & 0 deletions pkg/providers/prom/ctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,3 +524,136 @@ func EqualTimeSeries(tsList1, tsList2 []*common.TimeSeries) bool {
}
return true
}

func TestComputeWindowShards(t *testing.T) {
type args struct {
window *promapiv1.Range
maxPointsLimitPerTimeSeries int
}
now := time.Now().Truncate(time.Second)
tests := []struct {
name string
args args
want []*promapiv1.Range
}{
{
"len(timeWidow) == 1 < maxPointsLimitPerTimeSeries; input: [0,0], output: [0,0]",
args{
window: &promapiv1.Range{
Start: now,
End: now,
Step: time.Second,
},
maxPointsLimitPerTimeSeries: 10,
},
[]*promapiv1.Range{
{
Start: now,
End: now,
Step: time.Second,
},
},
},
{
"len(timeWidow) > maxPointsLimitPerTimeSeries; input: [0,10], output: [0,9] [10,10]",
args{
window: &promapiv1.Range{
Start: now,
End: now.Add(10 * time.Second),
Step: time.Second,
},
maxPointsLimitPerTimeSeries: 10,
},
[]*promapiv1.Range{
{
Start: now,
End: now.Add(9 * time.Second),
Step: time.Second,
},
{
Start: now.Add(10 * time.Second),
End: now.Add(10 * time.Second),
Step: time.Second,
},
},
},
{
"len(timeWidow) == maxPointsLimitPerTimeSeries; input: [0,9]; output: [0,9]",
args{
window: &promapiv1.Range{
Start: now,
End: now.Add(9 * time.Second),
Step: time.Second,
},
maxPointsLimitPerTimeSeries: 10,
},
[]*promapiv1.Range{
{
Start: now,
End: now.Add(9 * time.Second),
Step: time.Second,
},
},
},
{
"len(timeWidow) < maxPointsLimitPerTimeSeries; input: [0,5]; output: [0,5]",
args{
window: &promapiv1.Range{
Start: now,
End: now.Add(5 * time.Second),
Step: time.Second,
},
maxPointsLimitPerTimeSeries: 10,
},
[]*promapiv1.Range{
{
Start: now,
End: now.Add(5 * time.Second),
Step: time.Second,
},
},
},
{
"len(timeWidow) == 2*maxPointsLimitPerTimeSeries; input: [0,19], output: [0,9] [10,19]",
args{
window: &promapiv1.Range{
Start: now,
End: now.Add(19 * time.Second),
Step: time.Second,
},
maxPointsLimitPerTimeSeries: 10,
},
[]*promapiv1.Range{
{
Start: now,
End: now.Add(9 * time.Second),
Step: time.Second,
},
{
Start: now.Add(10 * time.Second),
End: now.Add(19 * time.Second),
Step: time.Second,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := ComputeWindowShards(tt.args.window, tt.args.maxPointsLimitPerTimeSeries)
if len(got) != len(tt.want) {
t.Errorf("window length = %v, want %v", len(got), len(tt.want))
}
for i, w := range tt.want {
if got[i].Start != w.Start {
t.Errorf("w.Start = %v, want %v", got[i].Start, w.Start)
}
if got[i].End != w.End {
t.Errorf("w.End = %v, want %v", got[i].End, w.End)
}
if got[i].Step != w.Step {
t.Errorf("w.Step = %v, want %v", got[i].Step, w.Step)
}
}
})
}
}

0 comments on commit a7a4866

Please sign in to comment.