Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Point slice pool fixes #958

Merged
merged 2 commits into from
Jun 10, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,20 @@ func Fix(in []schema.Point, from, to, interval uint32) []schema.Point {
// the requested range is too narrow for the requested interval
return []schema.Point{}
}
out := make([]schema.Point, (last-first)/interval+1)
// 3 attempts to get a sufficiently sized slice from the pool. if it fails, allocate a new one.
var out []schema.Point
neededCap := int((last-first)/interval + 1)
for attempt := 1; attempt < 4; attempt++ {
candidate := pointSlicePool.Get().([]schema.Point)
if cap(candidate) >= neededCap {
out = candidate[:neededCap]
break
}
pointSlicePool.Put(candidate)
}
if out == nil {
out = make([]schema.Point, neededCap)
}

// i iterates in. o iterates out. t is the ts we're looking to fill.
for t, i, o := first, 0, -1; t <= last; t += interval {
Expand Down Expand Up @@ -104,11 +117,13 @@ func Fix(in []schema.Point, from, to, interval uint32) []schema.Point {
o -= 1
}
}
pointSlicePool.Put(in[:0])
Copy link
Contributor

@DanCech DanCech Jun 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems backwards to me, I'd have thought that the caller would be responsible for returning in to the pool, since this function has no way of knowing whether the caller intends to continue using it.

Similarly, it seems like the caller should provide out so that it can be explicitly responsible for both getting it from the pool and cleaning it up afterwards.

Finally, is there a reason that we don't put the retry and allocation logic into pointSlicePool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd have thought that the caller would be responsible for returning in to the pool.
Similarly, it seems like the caller should provide out so that it can be explicitly responsible for both getting it from the pool and cleaning it up afterwards.

we can do that. I definitely think the pool interaction for in and out should be consistent (either both in caller, or both within Fix).
so this would make Fix a more pure utility function, which is good. but the downside is pulling out the neededCap := int((last-first)/interval + 1) calculation out of Fix is a bit weird.

Finally, is there a reason that we don't put the retry and allocation logic into pointSlicePool?

this makes sense. sidenote: I've been thinking of having separate size classes, that sort of stuff would all make sense if we make pointSlicePool a more full-featured "object" rather than merely a sync.Pool instance

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that does making moving out outside a little tough


return out
}

// divideContext wraps a Consolidate() call with a context.Context condition
// important: pointsB will be released to the pool. do not keep a reference to it
func divideContext(ctx context.Context, pointsA, pointsB []schema.Point) []schema.Point {
select {
case <-ctx.Done():
Expand All @@ -119,13 +134,16 @@ func divideContext(ctx context.Context, pointsA, pointsB []schema.Point) []schem
return divide(pointsA, pointsB)
}

// divide divides pointsA by pointsB - pointwise. pointsA will be reused for the output.
// important: pointsB will be released to the pool. do not keep a reference to it
func divide(pointsA, pointsB []schema.Point) []schema.Point {
if len(pointsA) != len(pointsB) {
panic(fmt.Errorf("divide of a series with len %d by a series with len %d", len(pointsA), len(pointsB)))
}
for i := range pointsA {
pointsA[i].Val /= pointsB[i].Val
}
pointSlicePool.Put(pointsB[:0])
return pointsA
}

Expand Down Expand Up @@ -397,7 +415,10 @@ func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidato
return nil, nil
default:
}
res.Points = append(s.itersToPoints(rctx, res.Iters), res.Points...)
res.Points = append(s.itersToPoints(rctx, res.Iters), res.Points...) // TODO the output from s.itersToPoints is never released to the pool?
// note: Fix() returns res.Points back to the pool
// this is safe because nothing else is still using it
// you can confirm this by analyzing what happens in prior calls such as itertoPoints and s.getSeries()
return Fix(res.Points, req.From, req.To, req.ArchInterval), nil
}

Expand Down Expand Up @@ -589,6 +610,9 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]tsz.
}

// check for duplicate series names for the same query. If found merge the results.
// each first uniquely-identified series's backing datapoints slice is reused
// any subsequent non-uniquely-identified series is merged into the former and has its
// datapoints slice returned to the pool.
func mergeSeries(in []models.Series) []models.Series {
type segment struct {
target string
Expand Down Expand Up @@ -626,6 +650,9 @@ func mergeSeries(in []models.Series) []models.Series {
}
}
}
for j := 1; j < len(series); j++ {
pointSlicePool.Put(series[j].Datapoints[:0])
}
merged[i] = series[0]
}
i++
Expand Down