-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathbuffer_util.go
160 lines (145 loc) · 4.79 KB
/
buffer_util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package sql
import (
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/rowcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/mon"
)
// rowContainerHelper is a wrapper around a disk-backed row container that
// should be used by planNodes (or similar components) whenever they need to
// buffer data. init must be called before the first use.
type rowContainerHelper struct {
rows *rowcontainer.DiskBackedRowContainer
scratch rowenc.EncDatumRow
memMonitor *mon.BytesMonitor
diskMonitor *mon.BytesMonitor
}
func (c *rowContainerHelper) init(
typs []*types.T, evalContext *extendedEvalContext, opName string, enableDeduplication bool,
) {
distSQLCfg := &evalContext.DistSQLPlanner.distSQLSrv.ServerConfig
c.memMonitor = execinfra.NewLimitedMonitorNoFlowCtx(
evalContext.Context, evalContext.Mon, distSQLCfg, evalContext.SessionData(),
fmt.Sprintf("%s-limited", opName),
)
c.diskMonitor = execinfra.NewMonitor(
evalContext.Context, distSQLCfg.ParentDiskMonitor, fmt.Sprintf("%s-disk", opName),
)
c.rows = &rowcontainer.DiskBackedRowContainer{}
ordering := colinfo.NoOrdering
if enableDeduplication {
ordering = make(colinfo.ColumnOrdering, len(typs))
for i := range ordering {
ordering[i].ColIdx = i
ordering[i].Direction = encoding.Ascending
}
}
c.rows.Init(
ordering, typs, &evalContext.EvalContext,
distSQLCfg.TempStorage, c.memMonitor, c.diskMonitor,
)
if enableDeduplication {
c.rows.DoDeDuplicate()
}
c.scratch = make(rowenc.EncDatumRow, len(typs))
}
// addRow adds a given row to the container.
func (c *rowContainerHelper) addRow(ctx context.Context, row tree.Datums) error {
for i := range row {
c.scratch[i].Datum = row[i]
}
return c.rows.AddRow(ctx, c.scratch)
}
// addRowWithDedup adds a given row if not already present in the container.
// To use this method, init must have been called with enableDeduplication=true.
func (c *rowContainerHelper) addRowWithDedup(
ctx context.Context, row tree.Datums,
) (ok bool, _ error) {
for i := range row {
c.scratch[i].Datum = row[i]
}
lenBefore := c.rows.Len()
if _, err := c.rows.AddRowWithDeDup(ctx, c.scratch); err != nil {
return false, err
}
return c.rows.Len() > lenBefore, nil
}
// len returns the number of rows buffered so far.
func (c *rowContainerHelper) len() int {
return c.rows.Len()
}
// clear prepares the helper for reuse (it resets the underlying container which
// will delete all buffered data; also, the container will be using the
// in-memory variant even if it spilled on the previous usage).
func (c *rowContainerHelper) clear(ctx context.Context) error {
return c.rows.UnsafeReset(ctx)
}
// close must be called once the helper is no longer needed to clean up any
// resources.
func (c *rowContainerHelper) close(ctx context.Context) {
if c.rows != nil {
c.rows.Close(ctx)
c.memMonitor.Stop(ctx)
c.diskMonitor.Stop(ctx)
c.rows = nil
}
}
// rowContainerIterator is a wrapper around rowcontainer.RowIterator that takes
// care of advancing the underlying iterator and converting the rows to
// tree.Datums.
type rowContainerIterator struct {
iter rowcontainer.RowIterator
typs []*types.T
datums tree.Datums
da rowenc.DatumAlloc
}
// newRowContainerIterator returns a new rowContainerIterator that must be
// closed once no longer needed.
func newRowContainerIterator(
ctx context.Context, c rowContainerHelper, typs []*types.T,
) *rowContainerIterator {
i := &rowContainerIterator{
iter: c.rows.NewIterator(ctx),
typs: typs,
datums: make(tree.Datums, len(typs)),
}
i.iter.Rewind()
return i
}
// next returns the next row of the iterator or an error if encountered. It
// returns nil, nil when the iterator has been exhausted.
func (i *rowContainerIterator) next() (tree.Datums, error) {
defer i.iter.Next()
if valid, err := i.iter.Valid(); err != nil {
return nil, err
} else if !valid {
// All rows have been exhausted.
return nil, nil
}
row, err := i.iter.Row()
if err != nil {
return nil, err
}
if err = rowenc.EncDatumRowToDatums(i.typs, i.datums, row, &i.da); err != nil {
return nil, err
}
return i.datums, nil
}
func (i *rowContainerIterator) close() {
i.iter.Close()
}