diff --git a/pkg/sql/distsqlrun/flow.go b/pkg/sql/distsqlrun/flow.go index 0b14fa420476..a2e076db23fe 100644 --- a/pkg/sql/distsqlrun/flow.go +++ b/pkg/sql/distsqlrun/flow.go @@ -626,6 +626,14 @@ func (f *Flow) Wait() { } } +// Releasable is an interface for objects than can be Released back into a +// memory pool when finished. +type Releasable interface { + // Release allows this object to be returned to a memory pool. Objects must + // not be used after Release is called. + Release() +} + // Cleanup should be called when the flow completes (after all processors and // mailboxes exited). func (f *Flow) Cleanup(ctx context.Context) { @@ -635,6 +643,11 @@ func (f *Flow) Cleanup(ctx context.Context) { // This closes the account and monitor opened in ServerImpl.setupFlow. f.EvalCtx.ActiveMemAcc.Close(ctx) f.EvalCtx.Stop(ctx) + for _, p := range f.processors { + if d, ok := p.(Releasable); ok { + d.Release() + } + } if log.V(1) { log.Infof(ctx, "cleaning up") } diff --git a/pkg/sql/distsqlrun/processors.go b/pkg/sql/distsqlrun/processors.go index adf754d79759..05a8210a0ea5 100644 --- a/pkg/sql/distsqlrun/processors.go +++ b/pkg/sql/distsqlrun/processors.go @@ -59,11 +59,12 @@ type ProcOutputHelper struct { rowAlloc sqlbase.EncDatumRowAlloc filter *exprHelper - // renderExprs is set if we have a rendering. Only one of renderExprs and - // outputCols can be set. + // renderExprs has length > 0 if we have a rendering. Only one of renderExprs + // and outputCols can be set. renderExprs []exprHelper - // outputCols is set if we have a projection. Only one of renderExprs and - // outputCols can be set. + // outputCols is non-nil if we have a projection. Only one of renderExprs and + // outputCols can be set. Note that 0-length projections are possible, in + // which case outputCols will be 0-length but non-nil. outputCols []uint32 outputRow sqlbase.EncDatumRow @@ -87,6 +88,14 @@ type ProcOutputHelper struct { rowIdx uint64 } +// Reset resets this ProcOutputHelper, retaining allocated memory in its slices. +func (h *ProcOutputHelper) Reset() { + *h = ProcOutputHelper{ + renderExprs: h.renderExprs[:0], + outputTypes: h.outputTypes[:0], + } +} + // Init sets up a ProcOutputHelper. The types describe the internal schema of // the processor (as described for each processor core spec); they can be // omitted if there is no filtering expression. @@ -120,14 +129,28 @@ func (h *ProcOutputHelper) Init( // nil indicates no projection; use an empty slice. h.outputCols = make([]uint32, 0) } - h.outputTypes = make([]sqlbase.ColumnType, len(h.outputCols)) + nOutputCols := len(h.outputCols) + if cap(h.outputTypes) >= nOutputCols { + h.outputTypes = h.outputTypes[:nOutputCols] + } else { + h.outputTypes = make([]sqlbase.ColumnType, nOutputCols) + } for i, c := range h.outputCols { h.outputTypes[i] = types[c] } - } else if len(post.RenderExprs) > 0 { - h.renderExprs = make([]exprHelper, len(post.RenderExprs)) - h.outputTypes = make([]sqlbase.ColumnType, len(post.RenderExprs)) + } else if nRenders := len(post.RenderExprs); nRenders > 0 { + if cap(h.renderExprs) >= nRenders { + h.renderExprs = h.renderExprs[:nRenders] + } else { + h.renderExprs = make([]exprHelper, nRenders) + } + if cap(h.outputTypes) >= nRenders { + h.outputTypes = h.outputTypes[:nRenders] + } else { + h.outputTypes = make([]sqlbase.ColumnType, nRenders) + } for i, expr := range post.RenderExprs { + h.renderExprs[i] = exprHelper{} if err := h.renderExprs[i].init(expr, types, evalCtx); err != nil { return err } @@ -138,9 +161,15 @@ func (h *ProcOutputHelper) Init( h.outputTypes[i] = colTyp } } else { - h.outputTypes = types + // No rendering or projection. + if cap(h.outputTypes) >= len(types) { + h.outputTypes = h.outputTypes[:len(types)] + } else { + h.outputTypes = make([]sqlbase.ColumnType, len(types)) + } + copy(h.outputTypes, types) } - if h.outputCols != nil || h.renderExprs != nil { + if h.outputCols != nil || len(h.renderExprs) > 0 { // We're rendering or projecting, so allocate an output row. h.outputRow = h.rowAlloc.AllocRow(len(h.outputTypes)) } @@ -158,7 +187,7 @@ func (h *ProcOutputHelper) Init( // neededColumns calculates the set of internal processor columns that are // actually used by the post-processing stage. func (h *ProcOutputHelper) neededColumns() (colIdxs util.FastIntSet) { - if h.outputCols == nil && h.renderExprs == nil { + if h.outputCols == nil && len(h.renderExprs) == 0 { // No projection or rendering; all columns are needed. colIdxs.AddRange(0, h.numInternalCols-1) return colIdxs @@ -177,12 +206,10 @@ func (h *ProcOutputHelper) neededColumns() (colIdxs util.FastIntSet) { } // See if render expressions require this column. - if h.renderExprs != nil { - for j := range h.renderExprs { - if h.renderExprs[j].vars.IndexedVarUsed(i) { - colIdxs.Add(i) - break - } + for j := range h.renderExprs { + if h.renderExprs[j].vars.IndexedVarUsed(i) { + colIdxs.Add(i) + break } } } @@ -345,7 +372,7 @@ func (h *ProcOutputHelper) ProcessRow( return nil, true, nil } - if h.renderExprs != nil { + if len(h.renderExprs) > 0 { // Rendering. for i := range h.renderExprs { datum, err := h.renderExprs[i].eval(row) @@ -539,6 +566,16 @@ type ProcessorBase struct { inputsToDrain []RowSource } +// Reset resets this ProcessorBase, retaining allocated memory in slices. +func (pb *ProcessorBase) Reset() { + pb.out.Reset() + *pb = ProcessorBase{ + out: pb.out, + trailingMeta: pb.trailingMeta[:0], + inputsToDrain: pb.inputsToDrain[:0], + } +} + // procState represents the standard states that a processor can be in. These // states are relevant when the processor is using the draining utilities in // ProcessorBase. diff --git a/pkg/sql/distsqlrun/tablereader.go b/pkg/sql/distsqlrun/tablereader.go index 04b8bd6bbceb..7d0ddf757fb1 100644 --- a/pkg/sql/distsqlrun/tablereader.go +++ b/pkg/sql/distsqlrun/tablereader.go @@ -20,6 +20,8 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "sync" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util" @@ -52,6 +54,12 @@ var _ RowSource = &tableReader{} const tableReaderProcName = "table reader" +var trPool = sync.Pool{ + New: func() interface{} { + return &tableReader{} + }, +} + // newTableReader creates a tableReader. func newTableReader( flowCtx *FlowCtx, @@ -64,7 +72,7 @@ func newTableReader( return nil, errors.Errorf("attempting to create a tableReader with uninitialized NodeID") } - tr := &tableReader{} + tr := trPool.Get().(*tableReader) tr.limitHint = limitHint(spec.LimitHint, post) @@ -109,7 +117,12 @@ func newTableReader( return nil, err } - tr.spans = make(roachpb.Spans, len(spec.Spans)) + nSpans := len(spec.Spans) + if cap(tr.spans) >= nSpans { + tr.spans = tr.spans[:nSpans] + } else { + tr.spans = make(roachpb.Spans, nSpans) + } for i, s := range spec.Spans { tr.spans[i] = s.Span } @@ -241,6 +254,18 @@ func (tr *tableReader) Start(ctx context.Context) context.Context { return ctx } +// Release releases this tableReader back to the pool. +func (tr *tableReader) Release() { + tr.ProcessorBase.Reset() + tr.fetcher.Reset() + *tr = tableReader{ + ProcessorBase: tr.ProcessorBase, + fetcher: tr.fetcher, + spans: tr.spans[:0], + } + trPool.Put(tr) +} + // Next is part of the RowSource interface. func (tr *tableReader) Next() (sqlbase.EncDatumRow, *ProducerMetadata) { for tr.State == StateRunning {