Skip to content

Commit

Permalink
Merge #30676
Browse files Browse the repository at this point in the history
30676: distsqlrun: pool TableReaders r=jordanlewis a=jordanlewis

And permit ProcOutputHelper and ProcessorBase to be reset without
throwing away all slice memory.

Extracted from #30556.

Release note: None

Co-authored-by: Jordan Lewis <jordanthelewis@gmail.com>
  • Loading branch information
craig[bot] and jordanlewis committed Oct 2, 2018
2 parents d7d7cb6 + b5fd051 commit 4e60d58
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 20 deletions.
13 changes: 13 additions & 0 deletions pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,14 @@ func (f *Flow) Wait() {
}
}

// Releasable is an interface for objects than can be Released back into a
// memory pool when finished.
type Releasable interface {
// Release allows this object to be returned to a memory pool. Objects must
// not be used after Release is called.
Release()
}

// Cleanup should be called when the flow completes (after all processors and
// mailboxes exited).
func (f *Flow) Cleanup(ctx context.Context) {
Expand All @@ -635,6 +643,11 @@ func (f *Flow) Cleanup(ctx context.Context) {
// This closes the account and monitor opened in ServerImpl.setupFlow.
f.EvalCtx.ActiveMemAcc.Close(ctx)
f.EvalCtx.Stop(ctx)
for _, p := range f.processors {
if d, ok := p.(Releasable); ok {
d.Release()
}
}
if log.V(1) {
log.Infof(ctx, "cleaning up")
}
Expand Down
73 changes: 55 additions & 18 deletions pkg/sql/distsqlrun/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ type ProcOutputHelper struct {
rowAlloc sqlbase.EncDatumRowAlloc

filter *exprHelper
// renderExprs is set if we have a rendering. Only one of renderExprs and
// outputCols can be set.
// renderExprs has length > 0 if we have a rendering. Only one of renderExprs
// and outputCols can be set.
renderExprs []exprHelper
// outputCols is set if we have a projection. Only one of renderExprs and
// outputCols can be set.
// outputCols is non-nil if we have a projection. Only one of renderExprs and
// outputCols can be set. Note that 0-length projections are possible, in
// which case outputCols will be 0-length but non-nil.
outputCols []uint32

outputRow sqlbase.EncDatumRow
Expand All @@ -87,6 +88,14 @@ type ProcOutputHelper struct {
rowIdx uint64
}

// Reset resets this ProcOutputHelper, retaining allocated memory in its slices.
func (h *ProcOutputHelper) Reset() {
*h = ProcOutputHelper{
renderExprs: h.renderExprs[:0],
outputTypes: h.outputTypes[:0],
}
}

// Init sets up a ProcOutputHelper. The types describe the internal schema of
// the processor (as described for each processor core spec); they can be
// omitted if there is no filtering expression.
Expand Down Expand Up @@ -120,14 +129,28 @@ func (h *ProcOutputHelper) Init(
// nil indicates no projection; use an empty slice.
h.outputCols = make([]uint32, 0)
}
h.outputTypes = make([]sqlbase.ColumnType, len(h.outputCols))
nOutputCols := len(h.outputCols)
if cap(h.outputTypes) >= nOutputCols {
h.outputTypes = h.outputTypes[:nOutputCols]
} else {
h.outputTypes = make([]sqlbase.ColumnType, nOutputCols)
}
for i, c := range h.outputCols {
h.outputTypes[i] = types[c]
}
} else if len(post.RenderExprs) > 0 {
h.renderExprs = make([]exprHelper, len(post.RenderExprs))
h.outputTypes = make([]sqlbase.ColumnType, len(post.RenderExprs))
} else if nRenders := len(post.RenderExprs); nRenders > 0 {
if cap(h.renderExprs) >= nRenders {
h.renderExprs = h.renderExprs[:nRenders]
} else {
h.renderExprs = make([]exprHelper, nRenders)
}
if cap(h.outputTypes) >= nRenders {
h.outputTypes = h.outputTypes[:nRenders]
} else {
h.outputTypes = make([]sqlbase.ColumnType, nRenders)
}
for i, expr := range post.RenderExprs {
h.renderExprs[i] = exprHelper{}
if err := h.renderExprs[i].init(expr, types, evalCtx); err != nil {
return err
}
Expand All @@ -138,9 +161,15 @@ func (h *ProcOutputHelper) Init(
h.outputTypes[i] = colTyp
}
} else {
h.outputTypes = types
// No rendering or projection.
if cap(h.outputTypes) >= len(types) {
h.outputTypes = h.outputTypes[:len(types)]
} else {
h.outputTypes = make([]sqlbase.ColumnType, len(types))
}
copy(h.outputTypes, types)
}
if h.outputCols != nil || h.renderExprs != nil {
if h.outputCols != nil || len(h.renderExprs) > 0 {
// We're rendering or projecting, so allocate an output row.
h.outputRow = h.rowAlloc.AllocRow(len(h.outputTypes))
}
Expand All @@ -158,7 +187,7 @@ func (h *ProcOutputHelper) Init(
// neededColumns calculates the set of internal processor columns that are
// actually used by the post-processing stage.
func (h *ProcOutputHelper) neededColumns() (colIdxs util.FastIntSet) {
if h.outputCols == nil && h.renderExprs == nil {
if h.outputCols == nil && len(h.renderExprs) == 0 {
// No projection or rendering; all columns are needed.
colIdxs.AddRange(0, h.numInternalCols-1)
return colIdxs
Expand All @@ -177,12 +206,10 @@ func (h *ProcOutputHelper) neededColumns() (colIdxs util.FastIntSet) {
}

// See if render expressions require this column.
if h.renderExprs != nil {
for j := range h.renderExprs {
if h.renderExprs[j].vars.IndexedVarUsed(i) {
colIdxs.Add(i)
break
}
for j := range h.renderExprs {
if h.renderExprs[j].vars.IndexedVarUsed(i) {
colIdxs.Add(i)
break
}
}
}
Expand Down Expand Up @@ -345,7 +372,7 @@ func (h *ProcOutputHelper) ProcessRow(
return nil, true, nil
}

if h.renderExprs != nil {
if len(h.renderExprs) > 0 {
// Rendering.
for i := range h.renderExprs {
datum, err := h.renderExprs[i].eval(row)
Expand Down Expand Up @@ -539,6 +566,16 @@ type ProcessorBase struct {
inputsToDrain []RowSource
}

// Reset resets this ProcessorBase, retaining allocated memory in slices.
func (pb *ProcessorBase) Reset() {
pb.out.Reset()
*pb = ProcessorBase{
out: pb.out,
trailingMeta: pb.trailingMeta[:0],
inputsToDrain: pb.inputsToDrain[:0],
}
}

// procState represents the standard states that a processor can be in. These
// states are relevant when the processor is using the draining utilities in
// ProcessorBase.
Expand Down
29 changes: 27 additions & 2 deletions pkg/sql/distsqlrun/tablereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"

"sync"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util"
Expand Down Expand Up @@ -52,6 +54,12 @@ var _ RowSource = &tableReader{}

const tableReaderProcName = "table reader"

var trPool = sync.Pool{
New: func() interface{} {
return &tableReader{}
},
}

// newTableReader creates a tableReader.
func newTableReader(
flowCtx *FlowCtx,
Expand All @@ -64,7 +72,7 @@ func newTableReader(
return nil, errors.Errorf("attempting to create a tableReader with uninitialized NodeID")
}

tr := &tableReader{}
tr := trPool.Get().(*tableReader)

tr.limitHint = limitHint(spec.LimitHint, post)

Expand Down Expand Up @@ -109,7 +117,12 @@ func newTableReader(
return nil, err
}

tr.spans = make(roachpb.Spans, len(spec.Spans))
nSpans := len(spec.Spans)
if cap(tr.spans) >= nSpans {
tr.spans = tr.spans[:nSpans]
} else {
tr.spans = make(roachpb.Spans, nSpans)
}
for i, s := range spec.Spans {
tr.spans[i] = s.Span
}
Expand Down Expand Up @@ -241,6 +254,18 @@ func (tr *tableReader) Start(ctx context.Context) context.Context {
return ctx
}

// Release releases this tableReader back to the pool.
func (tr *tableReader) Release() {
tr.ProcessorBase.Reset()
tr.fetcher.Reset()
*tr = tableReader{
ProcessorBase: tr.ProcessorBase,
fetcher: tr.fetcher,
spans: tr.spans[:0],
}
trPool.Put(tr)
}

// Next is part of the RowSource interface.
func (tr *tableReader) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
for tr.State == StateRunning {
Expand Down

0 comments on commit 4e60d58

Please sign in to comment.