From 7948c125ac9db4bcee1950f285df5733deeae09d Mon Sep 17 00:00:00 2001 From: Zhuhe Fang Date: Fri, 18 Sep 2020 12:19:52 +0800 Subject: [PATCH] expression: Short cut expr vec bug (#19775) --- expression/builtin_compare_vec_generated.go | 232 ++++++++++++++++++-- expression/builtin_op_vec.go | 60 ++++- expression/generator/compare_vec.go | 63 +++++- expression/integration_test.go | 6 + 4 files changed, 330 insertions(+), 31 deletions(-) diff --git a/expression/builtin_compare_vec_generated.go b/expression/builtin_compare_vec_generated.go index 4e100ff9d1ef2..5f627ff8ed385 100644 --- a/expression/builtin_compare_vec_generated.go +++ b/expression/builtin_compare_vec_generated.go @@ -1757,6 +1757,28 @@ func (b *builtinNullEQJSONSig) vectorized() bool { return true } +// NOTE: Coalesce just return the first non-null item, but vectorization do each item, which would incur additional errors. If this case happen, +// the vectorization falls back to the scalar execution. +func (b *builtinCoalesceIntSig) fallbackEvalInt(input *chunk.Chunk, result *chunk.Column) error { + n := input.NumRows() + + x := result.Int64s() + for i := 0; i < n; i++ { + res, isNull, err := b.evalInt(input.GetRow(i)) + if err != nil { + return err + } + result.SetNull(i, isNull) + if isNull { + continue + } + + x[i] = res + + } + return nil +} + func (b *builtinCoalesceIntSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { n := input.NumRows() result.ResizeInt64(n, true) @@ -1766,10 +1788,16 @@ func (b *builtinCoalesceIntSig) vecEvalInt(input *chunk.Chunk, result *chunk.Col return err } defer b.bufAllocator.put(buf1) + sc := b.ctx.GetSessionVars().StmtCtx + beforeWarns := sc.WarningCount() for j := 0; j < len(b.args); j++ { - - if err := b.args[j].VecEvalInt(b.ctx, input, buf1); err != nil { - return err + err := b.args[j].VecEvalInt(b.ctx, input, buf1) + afterWarns := sc.WarningCount() + if err != nil || afterWarns > beforeWarns { + if afterWarns > beforeWarns { + sc.TruncateWarnings(int(beforeWarns)) + } + return b.fallbackEvalInt(input, result) } args := buf1.Int64s() for i := 0; i < n; i++ { @@ -1786,6 +1814,28 @@ func (b *builtinCoalesceIntSig) vectorized() bool { return true } +// NOTE: Coalesce just return the first non-null item, but vectorization do each item, which would incur additional errors. If this case happen, +// the vectorization falls back to the scalar execution. +func (b *builtinCoalesceRealSig) fallbackEvalReal(input *chunk.Chunk, result *chunk.Column) error { + n := input.NumRows() + + x := result.Float64s() + for i := 0; i < n; i++ { + res, isNull, err := b.evalReal(input.GetRow(i)) + if err != nil { + return err + } + result.SetNull(i, isNull) + if isNull { + continue + } + + x[i] = res + + } + return nil +} + func (b *builtinCoalesceRealSig) vecEvalReal(input *chunk.Chunk, result *chunk.Column) error { n := input.NumRows() result.ResizeFloat64(n, true) @@ -1795,10 +1845,16 @@ func (b *builtinCoalesceRealSig) vecEvalReal(input *chunk.Chunk, result *chunk.C return err } defer b.bufAllocator.put(buf1) + sc := b.ctx.GetSessionVars().StmtCtx + beforeWarns := sc.WarningCount() for j := 0; j < len(b.args); j++ { - - if err := b.args[j].VecEvalReal(b.ctx, input, buf1); err != nil { - return err + err := b.args[j].VecEvalReal(b.ctx, input, buf1) + afterWarns := sc.WarningCount() + if err != nil || afterWarns > beforeWarns { + if afterWarns > beforeWarns { + sc.TruncateWarnings(int(beforeWarns)) + } + return b.fallbackEvalReal(input, result) } args := buf1.Float64s() for i := 0; i < n; i++ { @@ -1815,6 +1871,28 @@ func (b *builtinCoalesceRealSig) vectorized() bool { return true } +// NOTE: Coalesce just return the first non-null item, but vectorization do each item, which would incur additional errors. If this case happen, +// the vectorization falls back to the scalar execution. +func (b *builtinCoalesceDecimalSig) fallbackEvalDecimal(input *chunk.Chunk, result *chunk.Column) error { + n := input.NumRows() + + x := result.Decimals() + for i := 0; i < n; i++ { + res, isNull, err := b.evalDecimal(input.GetRow(i)) + if err != nil { + return err + } + result.SetNull(i, isNull) + if isNull { + continue + } + + x[i] = *res + + } + return nil +} + func (b *builtinCoalesceDecimalSig) vecEvalDecimal(input *chunk.Chunk, result *chunk.Column) error { n := input.NumRows() result.ResizeDecimal(n, true) @@ -1824,10 +1902,16 @@ func (b *builtinCoalesceDecimalSig) vecEvalDecimal(input *chunk.Chunk, result *c return err } defer b.bufAllocator.put(buf1) + sc := b.ctx.GetSessionVars().StmtCtx + beforeWarns := sc.WarningCount() for j := 0; j < len(b.args); j++ { - - if err := b.args[j].VecEvalDecimal(b.ctx, input, buf1); err != nil { - return err + err := b.args[j].VecEvalDecimal(b.ctx, input, buf1) + afterWarns := sc.WarningCount() + if err != nil || afterWarns > beforeWarns { + if afterWarns > beforeWarns { + sc.TruncateWarnings(int(beforeWarns)) + } + return b.fallbackEvalDecimal(input, result) } args := buf1.Decimals() for i := 0; i < n; i++ { @@ -1844,12 +1928,33 @@ func (b *builtinCoalesceDecimalSig) vectorized() bool { return true } +// NOTE: Coalesce just return the first non-null item, but vectorization do each item, which would incur additional errors. If this case happen, +// the vectorization falls back to the scalar execution. +func (b *builtinCoalesceStringSig) fallbackEvalString(input *chunk.Chunk, result *chunk.Column) error { + n := input.NumRows() + + result.ReserveString(n) + for i := 0; i < n; i++ { + res, isNull, err := b.evalString(input.GetRow(i)) + if err != nil { + return err + } + if isNull { + result.AppendNull() + continue + } + result.AppendString(res) + } + return nil +} + func (b *builtinCoalesceStringSig) vecEvalString(input *chunk.Chunk, result *chunk.Column) error { n := input.NumRows() argLen := len(b.args) bufs := make([]*chunk.Column, argLen) - + sc := b.ctx.GetSessionVars().StmtCtx + beforeWarns := sc.WarningCount() for i := 0; i < argLen; i++ { buf, err := b.bufAllocator.get(types.ETInt, n) if err != nil { @@ -1857,8 +1962,12 @@ func (b *builtinCoalesceStringSig) vecEvalString(input *chunk.Chunk, result *chu } defer b.bufAllocator.put(buf) err = b.args[i].VecEvalString(b.ctx, input, buf) - if err != nil { - return err + afterWarns := sc.WarningCount() + if err != nil || afterWarns > beforeWarns { + if afterWarns > beforeWarns { + sc.TruncateWarnings(int(beforeWarns)) + } + return b.fallbackEvalString(input, result) } bufs[i] = buf } @@ -1882,6 +1991,28 @@ func (b *builtinCoalesceStringSig) vectorized() bool { return true } +// NOTE: Coalesce just return the first non-null item, but vectorization do each item, which would incur additional errors. If this case happen, +// the vectorization falls back to the scalar execution. +func (b *builtinCoalesceTimeSig) fallbackEvalTime(input *chunk.Chunk, result *chunk.Column) error { + n := input.NumRows() + + x := result.Times() + for i := 0; i < n; i++ { + res, isNull, err := b.evalTime(input.GetRow(i)) + if err != nil { + return err + } + result.SetNull(i, isNull) + if isNull { + continue + } + + x[i] = res + + } + return nil +} + func (b *builtinCoalesceTimeSig) vecEvalTime(input *chunk.Chunk, result *chunk.Column) error { n := input.NumRows() result.ResizeTime(n, true) @@ -1891,10 +2022,16 @@ func (b *builtinCoalesceTimeSig) vecEvalTime(input *chunk.Chunk, result *chunk.C return err } defer b.bufAllocator.put(buf1) + sc := b.ctx.GetSessionVars().StmtCtx + beforeWarns := sc.WarningCount() for j := 0; j < len(b.args); j++ { - - if err := b.args[j].VecEvalTime(b.ctx, input, buf1); err != nil { - return err + err := b.args[j].VecEvalTime(b.ctx, input, buf1) + afterWarns := sc.WarningCount() + if err != nil || afterWarns > beforeWarns { + if afterWarns > beforeWarns { + sc.TruncateWarnings(int(beforeWarns)) + } + return b.fallbackEvalTime(input, result) } args := buf1.Times() for i := 0; i < n; i++ { @@ -1911,6 +2048,28 @@ func (b *builtinCoalesceTimeSig) vectorized() bool { return true } +// NOTE: Coalesce just return the first non-null item, but vectorization do each item, which would incur additional errors. If this case happen, +// the vectorization falls back to the scalar execution. +func (b *builtinCoalesceDurationSig) fallbackEvalDuration(input *chunk.Chunk, result *chunk.Column) error { + n := input.NumRows() + + x := result.GoDurations() + for i := 0; i < n; i++ { + res, isNull, err := b.evalDuration(input.GetRow(i)) + if err != nil { + return err + } + result.SetNull(i, isNull) + if isNull { + continue + } + + x[i] = res.Duration + + } + return nil +} + func (b *builtinCoalesceDurationSig) vecEvalDuration(input *chunk.Chunk, result *chunk.Column) error { n := input.NumRows() result.ResizeGoDuration(n, true) @@ -1920,10 +2079,16 @@ func (b *builtinCoalesceDurationSig) vecEvalDuration(input *chunk.Chunk, result return err } defer b.bufAllocator.put(buf1) + sc := b.ctx.GetSessionVars().StmtCtx + beforeWarns := sc.WarningCount() for j := 0; j < len(b.args); j++ { - - if err := b.args[j].VecEvalDuration(b.ctx, input, buf1); err != nil { - return err + err := b.args[j].VecEvalDuration(b.ctx, input, buf1) + afterWarns := sc.WarningCount() + if err != nil || afterWarns > beforeWarns { + if afterWarns > beforeWarns { + sc.TruncateWarnings(int(beforeWarns)) + } + return b.fallbackEvalDuration(input, result) } args := buf1.GoDurations() for i := 0; i < n; i++ { @@ -1940,12 +2105,33 @@ func (b *builtinCoalesceDurationSig) vectorized() bool { return true } +// NOTE: Coalesce just return the first non-null item, but vectorization do each item, which would incur additional errors. If this case happen, +// the vectorization falls back to the scalar execution. +func (b *builtinCoalesceJSONSig) fallbackEvalJSON(input *chunk.Chunk, result *chunk.Column) error { + n := input.NumRows() + + result.ReserveJSON(n) + for i := 0; i < n; i++ { + res, isNull, err := b.evalJSON(input.GetRow(i)) + if err != nil { + return err + } + if isNull { + result.AppendNull() + continue + } + result.AppendJSON(res) + } + return nil +} + func (b *builtinCoalesceJSONSig) vecEvalJSON(input *chunk.Chunk, result *chunk.Column) error { n := input.NumRows() argLen := len(b.args) bufs := make([]*chunk.Column, argLen) - + sc := b.ctx.GetSessionVars().StmtCtx + beforeWarns := sc.WarningCount() for i := 0; i < argLen; i++ { buf, err := b.bufAllocator.get(types.ETInt, n) if err != nil { @@ -1953,8 +2139,12 @@ func (b *builtinCoalesceJSONSig) vecEvalJSON(input *chunk.Chunk, result *chunk.C } defer b.bufAllocator.put(buf) err = b.args[i].VecEvalJSON(b.ctx, input, buf) - if err != nil { - return err + afterWarns := sc.WarningCount() + if err != nil || afterWarns > beforeWarns { + if afterWarns > beforeWarns { + sc.TruncateWarnings(int(beforeWarns)) + } + return b.fallbackEvalJSON(input, result) } bufs[i] = buf } diff --git a/expression/builtin_op_vec.go b/expression/builtin_op_vec.go index d7843eef2fc1e..74e5e150aeed8 100644 --- a/expression/builtin_op_vec.go +++ b/expression/builtin_op_vec.go @@ -54,6 +54,24 @@ func (b *builtinLogicOrSig) vectorized() bool { return true } +func (b *builtinLogicOrSig) fallbackEvalInt(input *chunk.Chunk, result *chunk.Column) error { + n := input.NumRows() + result.ResizeInt64(n, false) + x := result.Int64s() + for i := 0; i < n; i++ { + res, isNull, err := b.evalInt(input.GetRow(i)) + if err != nil { + return err + } + result.SetNull(i, isNull) + if isNull { + continue + } + x[i] = res + } + return nil +} + func (b *builtinLogicOrSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { if err := b.args[0].VecEvalInt(b.ctx, input, result); err != nil { return err @@ -65,8 +83,16 @@ func (b *builtinLogicOrSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) return err } defer b.bufAllocator.put(buf) - if err := b.args[1].VecEvalInt(b.ctx, input, buf); err != nil { - return err + + sc := b.ctx.GetSessionVars().StmtCtx + beforeWarns := sc.WarningCount() + err = b.args[1].VecEvalInt(b.ctx, input, buf) + afterWarns := sc.WarningCount() + if err != nil || afterWarns > beforeWarns { + if afterWarns > beforeWarns { + sc.TruncateWarnings(int(beforeWarns)) + } + return b.fallbackEvalInt(input, result) } i64s := result.Int64s() @@ -322,6 +348,24 @@ func (b *builtinLogicAndSig) vectorized() bool { return true } +func (b *builtinLogicAndSig) fallbackEvalInt(input *chunk.Chunk, result *chunk.Column) error { + n := input.NumRows() + result.ResizeInt64(n, false) + x := result.Int64s() + for i := 0; i < n; i++ { + res, isNull, err := b.evalInt(input.GetRow(i)) + if err != nil { + return err + } + result.SetNull(i, isNull) + if isNull { + continue + } + x[i] = res + } + return nil +} + func (b *builtinLogicAndSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { n := input.NumRows() @@ -334,8 +378,16 @@ func (b *builtinLogicAndSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column return err } defer b.bufAllocator.put(buf1) - if err := b.args[1].VecEvalInt(b.ctx, input, buf1); err != nil { - return err + + sc := b.ctx.GetSessionVars().StmtCtx + beforeWarns := sc.WarningCount() + err = b.args[1].VecEvalInt(b.ctx, input, buf1) + afterWarns := sc.WarningCount() + if err != nil || afterWarns > beforeWarns { + if afterWarns > beforeWarns { + sc.TruncateWarnings(int(beforeWarns)) + } + return b.fallbackEvalInt(input, result) } i64s := result.Int64s() diff --git a/expression/generator/compare_vec.go b/expression/generator/compare_vec.go index 0f22af2f3956a..5864e61c02c3c 100644 --- a/expression/generator/compare_vec.go +++ b/expression/generator/compare_vec.go @@ -171,7 +171,47 @@ func (b *builtin{{ .compare.CompareName }}{{ .type.TypeName }}Sig) vectorized() `)) var builtinCoalesceCompareVecTpl = template.Must(template.New("").Parse(` +// NOTE: Coalesce just return the first non-null item, but vectorization do each item, which would incur additional errors. If this case happen, +// the vectorization falls back to the scalar execution. +func (b *builtin{{ .compare.CompareName }}{{ .type.TypeName }}Sig) fallbackEval{{ .type.TypeName }}(input *chunk.Chunk, result *chunk.Column) error { + n := input.NumRows() {{ if .type.Fixed }} + x := result.{{ .type.TypeNameInColumn }}s() + for i := 0; i < n; i++ { + res, isNull, err := b.eval{{ .type.TypeName }}(input.GetRow(i)) + if err != nil { + return err + } + result.SetNull(i, isNull) + if isNull { + continue + } + {{ if eq .type.TypeName "Decimal" }} + x[i] = *res + {{ else if eq .type.TypeName "Duration" }} + x[i] = res.Duration + {{ else }} + x[i] = res + {{ end }} + } + {{ else }} + result.Reserve{{ .type.TypeNameInColumn }}(n) + for i := 0; i < n; i++ { + res, isNull, err := b.eval{{ .type.TypeName }}(input.GetRow(i)) + if err != nil { + return err + } + if isNull { + result.AppendNull() + continue + } + result.Append{{ .type.TypeNameInColumn }}(res) + } + {{ end -}} + return nil +} + +{{ if .type.Fixed }} func (b *builtin{{ .compare.CompareName }}{{ .type.TypeName }}Sig) vecEval{{ .type.TypeName }}(input *chunk.Chunk, result *chunk.Column) error { n := input.NumRows() result.Resize{{ .type.TypeNameInColumn }}(n, true) @@ -181,10 +221,16 @@ func (b *builtin{{ .compare.CompareName }}{{ .type.TypeName }}Sig) vecEval{{ .ty return err } defer b.bufAllocator.put(buf1) + sc := b.ctx.GetSessionVars().StmtCtx + beforeWarns := sc.WarningCount() for j := 0; j < len(b.args); j++{ - - if err := b.args[j].VecEval{{ .type.TypeName }}(b.ctx, input, buf1); err != nil { - return err + err := b.args[j].VecEval{{ .type.TypeName }}(b.ctx, input, buf1) + afterWarns := sc.WarningCount() + if err != nil || afterWarns > beforeWarns { + if afterWarns > beforeWarns { + sc.TruncateWarnings(int(beforeWarns)) + } + return b.fallbackEval{{ .type.TypeName }}(input, result) } args := buf1.{{ .type.TypeNameInColumn }}s() for i := 0; i < n; i++ { @@ -202,7 +248,8 @@ func (b *builtin{{ .compare.CompareName }}{{ .type.TypeName }}Sig) vecEval{{ .ty argLen := len(b.args) bufs := make([]*chunk.Column, argLen) - + sc := b.ctx.GetSessionVars().StmtCtx + beforeWarns := sc.WarningCount() for i := 0; i < argLen; i++ { buf, err := b.bufAllocator.get(types.ETInt, n) if err != nil { @@ -210,8 +257,12 @@ func (b *builtin{{ .compare.CompareName }}{{ .type.TypeName }}Sig) vecEval{{ .ty } defer b.bufAllocator.put(buf) err = b.args[i].VecEval{{ .type.TypeName }}(b.ctx, input, buf) - if err != nil { - return err + afterWarns := sc.WarningCount() + if err != nil || afterWarns > beforeWarns { + if afterWarns > beforeWarns { + sc.TruncateWarnings(int(beforeWarns)) + } + return b.fallbackEval{{ .type.TypeName }}(input, result) } bufs[i]=buf } diff --git a/expression/integration_test.go b/expression/integration_test.go index 7498a4f48297c..3418588e44991 100755 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -2830,6 +2830,12 @@ func (s *testIntegrationSuite2) TestBuiltin(c *C) { tk.MustQuery("show warnings").Check(testkit.Rows()) tk.MustQuery("select ifnull(b, 1/0) from t") tk.MustQuery("show warnings").Check(testkit.Rows()) + tk.MustQuery("select COALESCE(1, b, b/0) from t") + tk.MustQuery("show warnings").Check(testkit.Rows()) + tk.MustQuery("select 0 and b/0 from t") + tk.MustQuery("show warnings").Check(testkit.Rows()) + tk.MustQuery("select 1 or b/0 from t") + tk.MustQuery("show warnings").Check(testkit.Rows()) tk.MustQuery("select case 2.0 when 2.0 then 3.0 when 3.0 then 2.0 end").Check(testkit.Rows("3.0")) tk.MustQuery("select case 2.0 when 3.0 then 2.0 when 4.0 then 3.0 else 5.0 end").Check(testkit.Rows("5.0"))