Skip to content

Commit

Permalink
Merge #38557 #38613
Browse files Browse the repository at this point in the history
38557: exec: protect against unset syncFlowConsumer r=jordanlewis a=asubiotto

This should never happen since it implies that the receiver isn't
connected correctly. These happen when a node sends a SetupFlow request
to a remote node where the spec specifies that the response is on that
remote node. We don't see panics in the row execution engine due to
wrapping the syncFlowConsumer with a copyingRowReceiver, but this state
can cause setupVectorized to panic.

This commit protects against this state pending further investigation.

Release note: None

38613: exec: Add support for projection of the IN operator r=rohany a=rohany

vectorized execution now supports statements of the form

```
select x, x IN (1,2,3) from t;
```

Co-authored-by: Alfonso Subiotto Marqués <alfonso@cockroachlabs.com>
Co-authored-by: Rohan Yadav <rohany@alumni.cmu.edu>
  • Loading branch information
3 people committed Jul 8, 2019
3 parents a9dd587 + 08cc37d + 7730365 commit 712c2db
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 9 deletions.
15 changes: 14 additions & 1 deletion pkg/sql/distsqlrun/column_exec_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,17 @@ func planProjectionExpr(
// The projection result will be outputted to a new column which is appended
// to the input batch.
resultIdx = len(ct)
op, err = exec.GetProjectionRConstOperator(typ, binOp, leftOp, leftIdx, rConstArg, resultIdx)
if binOp == tree.In || binOp == tree.NotIn {
negate := binOp == tree.NotIn
datumTuple, ok := tree.AsDTuple(rConstArg)
if !ok {
err = errors.Errorf("IN operator supported only on constant expressions")
return nil, resultIdx, ct, err
}
op, err = exec.GetInProjectionOperator(typ, leftOp, leftIdx, resultIdx, datumTuple, negate)
} else {
op, err = exec.GetProjectionRConstOperator(typ, binOp, leftOp, leftIdx, rConstArg, resultIdx)
}
ct = append(ct, *typ)
return op, resultIdx, ct, err
}
Expand Down Expand Up @@ -1086,6 +1096,9 @@ func (f *Flow) setupVectorized(ctx context.Context) error {
}
metadataSourcesQueue = metadataSourcesQueue[:0]
case distsqlpb.StreamEndpointSpec_SYNC_RESPONSE:
if f.syncFlowConsumer == nil {
return errors.New("syncFlowConsumer unset, unable to create materializer")
}
// Make the materializer, which will write to the given receiver.
columnTypes := f.syncFlowConsumer.Types()
outputToInputColIdx := make([]int, len(columnTypes))
Expand Down
77 changes: 77 additions & 0 deletions pkg/sql/exec/select_in_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,80 @@ func BenchmarkSelectInInt64(b *testing.B) {
}
}
}

func TestProjectInInt64(t *testing.T) {
testCases := []struct {
desc string
inputTuples tuples
outputTuples tuples
filterRow []int64
hasNulls bool
negate bool
}{
{
desc: "Simple in test",
inputTuples: tuples{{0}, {1}},
outputTuples: tuples{{true}, {true}},
filterRow: []int64{0, 1},
hasNulls: false,
negate: false,
},
{
desc: "Simple not in test",
inputTuples: tuples{{2}},
outputTuples: tuples{{true}},
filterRow: []int64{0, 1},
hasNulls: false,
negate: true,
},
{
desc: "In test with NULLs",
inputTuples: tuples{{1}, {2}, {nil}},
outputTuples: tuples{{true}, {nil}, {nil}},
filterRow: []int64{1},
hasNulls: true,
negate: false,
},
{
desc: "Not in test with NULLs",
inputTuples: tuples{{1}, {2}, {nil}},
outputTuples: tuples{{false}, {nil}, {nil}},
filterRow: []int64{1},
hasNulls: true,
negate: true,
},
{
desc: "Not in test with NULLs and no nulls in filter",
inputTuples: tuples{{1}, {2}, {nil}},
outputTuples: tuples{{false}, {true}, {nil}},
filterRow: []int64{1},
hasNulls: false,
negate: true,
},
{
desc: "Test with false values",
inputTuples: tuples{{1}, {2}},
outputTuples: tuples{{false}, {false}},
filterRow: []int64{3},
hasNulls: false,
negate: false,
},
}

for _, c := range testCases {
t.Run(c.desc, func(t *testing.T) {
runTests(t, []tuples{c.inputTuples}, c.outputTuples, orderedVerifier, []int{1},
func(input []Operator) (Operator, error) {
op := projectInOpInt64{
input: input[0],
colIdx: 0,
outputIdx: 1,
filterRow: c.filterRow,
negate: c.negate,
hasNulls: c.hasNulls,
}
return &op, nil
})
})
}
}
134 changes: 126 additions & 8 deletions pkg/sql/exec/select_in_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,30 @@ const (
siNull
)

func GetInProjectionOperator(
ct *semtypes.T, input Operator, colIdx int, resultIdx int, datumTuple *tree.DTuple, negate bool,
) (Operator, error) {
var err error
switch t := conv.FromColumnType(ct); t {
// {{range .}}
case types._TYPE:
obj := &projectInOp_TYPE{
input: input,
colIdx: colIdx,
outputIdx: resultIdx,
negate: negate,
}
obj.filterRow, obj.hasNulls, err = fillDatumRow_TYPE(ct, datumTuple)
if err != nil {
return nil, err
}
return obj, nil
// {{end}}
default:
return nil, errors.Errorf("unhandled type: %s", t)
}
}

func GetInOperator(
ct *semtypes.T, input Operator, colIdx int, datumTuple *tree.DTuple, negate bool,
) (Operator, error) {
Expand Down Expand Up @@ -94,6 +118,15 @@ type selectInOp_TYPE struct {
negate bool
}

type projectInOp_TYPE struct {
input Operator
colIdx int
outputIdx int
filterRow []_GOTYPE
hasNulls bool
negate bool
}

func fillDatumRow_TYPE(ct *semtypes.T, datumTuple *tree.DTuple) ([]_GOTYPE, bool, error) {
conv := conv.GetDatumToPhysicalFn(ct)
var result []_GOTYPE
Expand All @@ -113,15 +146,15 @@ func fillDatumRow_TYPE(ct *semtypes.T, datumTuple *tree.DTuple) ([]_GOTYPE, bool
return result, hasNulls, nil
}

func (si *selectInOp_TYPE) cmpIn_TYPE(target _GOTYPE) comparisonResult {
for i := range si.filterRow {
func cmpIn_TYPE(target _GOTYPE, filterRow []_GOTYPE, hasNulls bool) comparisonResult {
for i := range filterRow {
var cmp bool
_ASSIGN_EQ(cmp, target, si.filterRow[i])
_ASSIGN_EQ(cmp, target, filterRow[i])
if cmp {
return siTrue
}
}
if si.hasNulls {
if hasNulls {
return siNull
} else {
return siFalse
Expand All @@ -132,6 +165,10 @@ func (si *selectInOp_TYPE) Init() {
si.input.Init()
}

func (pi *projectInOp_TYPE) Init() {
pi.input.Init()
}

func (si *selectInOp_TYPE) Next(ctx context.Context) coldata.Batch {
for {
batch := si.input.Next(ctx)
Expand All @@ -154,7 +191,7 @@ func (si *selectInOp_TYPE) Next(ctx context.Context) coldata.Batch {
if sel := batch.Selection(); sel != nil {
sel = sel[:n]
for _, i := range sel {
if !nulls.NullAt(uint16(i)) && si.cmpIn_TYPE(col[i]) == compVal {
if !nulls.NullAt(uint16(i)) && cmpIn_TYPE(col[i], si.filterRow, si.hasNulls) == compVal {
sel[idx] = uint16(i)
idx++
}
Expand All @@ -164,7 +201,7 @@ func (si *selectInOp_TYPE) Next(ctx context.Context) coldata.Batch {
sel := batch.Selection()
col = col[:n]
for i := range col {
if !nulls.NullAt(uint16(i)) && si.cmpIn_TYPE(col[i]) == compVal {
if !nulls.NullAt(uint16(i)) && cmpIn_TYPE(col[i], si.filterRow, si.hasNulls) == compVal {
sel[idx] = uint16(i)
idx++
}
Expand All @@ -174,7 +211,7 @@ func (si *selectInOp_TYPE) Next(ctx context.Context) coldata.Batch {
if sel := batch.Selection(); sel != nil {
sel = sel[:n]
for _, i := range sel {
if si.cmpIn_TYPE(col[i]) == compVal {
if cmpIn_TYPE(col[i], si.filterRow, si.hasNulls) == compVal {
sel[idx] = uint16(i)
idx++
}
Expand All @@ -184,7 +221,7 @@ func (si *selectInOp_TYPE) Next(ctx context.Context) coldata.Batch {
sel := batch.Selection()
col = col[:n]
for i := range col {
if si.cmpIn_TYPE(col[i]) == compVal {
if cmpIn_TYPE(col[i], si.filterRow, si.hasNulls) == compVal {
sel[idx] = uint16(i)
idx++
}
Expand All @@ -199,4 +236,85 @@ func (si *selectInOp_TYPE) Next(ctx context.Context) coldata.Batch {
}
}

func (pi *projectInOp_TYPE) Next(ctx context.Context) coldata.Batch {
batch := pi.input.Next(ctx)
if batch.Length() == 0 {
return batch
}

if pi.outputIdx == batch.Width() {
batch.AppendCol(types.Bool)
}

vec := batch.ColVec(pi.colIdx)
col := vec._TemplateType()[:coldata.BatchSize]

projVec := batch.ColVec(pi.outputIdx)
projCol := projVec.Bool()[:coldata.BatchSize]
projNulls := projVec.Nulls()

n := batch.Length()

cmpVal := siTrue
if pi.negate {
cmpVal = siFalse
}

if vec.HasNulls() {
nulls := vec.Nulls()
if sel := batch.Selection(); sel != nil {
sel = sel[:n]
for _, i := range sel {
if nulls.NullAt(uint16(i)) {
projNulls.SetNull(uint16(i))
} else {
cmpRes := cmpIn_TYPE(col[i], pi.filterRow, pi.hasNulls)
if cmpRes == siNull {
projNulls.SetNull(uint16(i))
} else {
projCol[i] = cmpRes == cmpVal
}
}
}
} else {
col = col[:n]
for i := range col {
if nulls.NullAt(uint16(i)) {
projNulls.SetNull(uint16(i))
} else {
cmpRes := cmpIn_TYPE(col[i], pi.filterRow, pi.hasNulls)
if cmpRes == siNull {
projNulls.SetNull(uint16(i))
} else {
projCol[i] = cmpRes == cmpVal
}
}
}
}
} else {
if sel := batch.Selection(); sel != nil {
sel = sel[:n]
for _, i := range sel {
cmpRes := cmpIn_TYPE(col[i], pi.filterRow, pi.hasNulls)
if cmpRes == siNull {
projNulls.SetNull(uint16(i))
} else {
projCol[i] = cmpRes == cmpVal
}
}
} else {
col = col[:n]
for i := range col {
cmpRes := cmpIn_TYPE(col[i], pi.filterRow, pi.hasNulls)
if cmpRes == siNull {
projNulls.SetNull(uint16(i))
} else {
projCol[i] = cmpRes == cmpVal
}
}
}
}
return batch
}

// {{end}}

0 comments on commit 712c2db

Please sign in to comment.