Skip to content

Commit

Permalink
Merge #38941
Browse files Browse the repository at this point in the history
38941: exec: fix a panic with sum_int aggregate with input types other than Int64 r=yuzefovich a=yuzefovich

First commit wraps Next of materializer with a vectorized panic catcher.

We used to catch panics only when calling Next() on the the input to
materializer. However, it is also possible that types get messed up
during planning a vectorized flow which can lead to a type conversion
panic. We do not want to crash in such case either, so we wrap full
Next() method with a catcher.

Second commit falls back to DistSQL on sum_int if not on Int64.

Currently, our aggregates expect that input and output types are
the same. However, sum_int is planned such that flow consumer
expects Int64 regardless of the input types. Supporting this would
require adding specialized sum aggregates that would take in any
int type and would output Int64. It is easier for now to just
fallback to DistSQL on non-int64 input types.

Fixes: #38937.

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Jul 17, 2019
2 parents 7cc3442 + 99b37e4 commit e62629b
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 10 deletions.
5 changes: 5 additions & 0 deletions pkg/sql/distsqlrun/column_exec_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ func newColOperator(
// issues, at first, we could plan SUM for all types besides Int64.
return nil, nil, memUsage, errors.Newf("sum on int cols not supported (use sum_int)")
}
case distsqlpb.AggregatorSpec_SUM_INT:
// TODO(yuzefovich): support this case through vectorize.
if aggTyps[i][0].Width() != 64 {
return nil, nil, memUsage, errors.Newf("sum_int is only supported on Int64 through vectorized")
}
}
_, retType, err := GetAggregateInfo(agg.Func, aggTyps[i]...)
if err != nil {
Expand Down
32 changes: 22 additions & 10 deletions pkg/sql/distsqlrun/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ type materializer struct {

// row is the memory used for the output row.
row sqlbase.EncDatumRow

// Fields to store the returned results of next() to be passed through an
// adapter.
outputRow sqlbase.EncDatumRow
outputMetadata *distsqlpb.ProducerMetadata
}

func newMaterializer(
Expand Down Expand Up @@ -95,21 +100,20 @@ func (m *materializer) Start(ctx context.Context) context.Context {
return ctx
}

// nextBatch saves the next batch from input in m.batch. For internal use only.
// The purpose of having this function is to not create an anonymous function
// on every call to Next().
func (m *materializer) nextBatch() {
m.batch = m.input.Next(m.Ctx)
// nextAdapter calls next() and saves the returned results in m. For internal
// use only. The purpose of having this function is to not create an anonymous
// function on every call to Next().
func (m *materializer) nextAdapter() {
m.outputRow, m.outputMetadata = m.next()
}

func (m *materializer) Next() (sqlbase.EncDatumRow, *distsqlpb.ProducerMetadata) {
// next is the logic of Next() extracted in a separate method to be used by an
// adapter to be able to wrap the latter with a catcher.
func (m *materializer) next() (sqlbase.EncDatumRow, *distsqlpb.ProducerMetadata) {
for m.State == StateRunning {
if m.batch == nil || m.curIdx >= m.batch.Length() {
// Get a fresh batch.
if err := exec.CatchVectorizedRuntimeError(m.nextBatch); err != nil {
m.MoveToDraining(err)
return nil, m.DrainHelper()
}
m.batch = m.input.Next(m.Ctx)

if m.batch.Length() == 0 {
m.MoveToDraining(nil /* err */)
Expand Down Expand Up @@ -145,6 +149,14 @@ func (m *materializer) Next() (sqlbase.EncDatumRow, *distsqlpb.ProducerMetadata)
return nil, m.DrainHelper()
}

func (m *materializer) Next() (sqlbase.EncDatumRow, *distsqlpb.ProducerMetadata) {
if err := exec.CatchVectorizedRuntimeError(m.nextAdapter); err != nil {
m.MoveToDraining(err)
return nil, m.DrainHelper()
}
return m.outputRow, m.outputMetadata
}

func (m *materializer) ConsumerClosed() {
m.InternalClose()
}
9 changes: 9 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/vectorize
Original file line number Diff line number Diff line change
Expand Up @@ -515,3 +515,12 @@ query I
SELECT EXTRACT(YEAR FROM x) FROM extract_test
----
2017

# Regression test for #38937
statement ok
CREATE TABLE t38937 (_int2) AS SELECT 1::INT2

query I
SELECT sum_int(_int2) FROM t38937
----
1

0 comments on commit e62629b

Please sign in to comment.