Skip to content

Commit

Permalink
Merge branch 'master' into c_temp_if_notexists
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Jul 7, 2021
2 parents 6800748 + 383fb9a commit 7dff9fe
Show file tree
Hide file tree
Showing 38 changed files with 1,601 additions and 144 deletions.
3 changes: 1 addition & 2 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/pingcap/tidb/tablecodec"
tidb_util "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/gcutil"
"github.com/tikv/client-go/v2/tikv"
)

const tiflashCheckTiDBHTTPAPIHalfInterval = 2500 * time.Millisecond
Expand Down Expand Up @@ -330,7 +329,7 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in

failpoint.Inject("mockRecoverTableCommitErr", func(val failpoint.Value) {
if val.(bool) && atomic.CompareAndSwapUint32(&mockRecoverTableCommitErrOnce, 0, 1) {
tikv.MockCommitErrorEnable()
err = failpoint.Enable(`tikvclient/mockCommitErrorOpt`, "return(true)")
}
})

Expand Down
2 changes: 2 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,8 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
}
// Reset DurationParse due to the next statement may not need to be parsed (not a text protocol query).
sessVars.DurationParse = 0
// Clean the stale read flag when statement execution finish
sessVars.StmtCtx.IsStaleness = false
}

// CloseRecordSet will finish the execution of current statement and do some record work
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ func (e AnalyzeColumnsExec) decodeSampleDataWithVirtualColumn(
totFts = append(totFts, col.RetType)
}
chk := chunk.NewChunkWithCapacity(totFts, len(collector.Samples))
decoder := codec.NewDecoder(chk, e.ctx.GetSessionVars().TimeZone)
decoder := codec.NewDecoder(chk, e.ctx.GetSessionVars().Location())
for _, sample := range collector.Samples {
for i := range sample.Columns {
if schema.Columns[i].VirtualExpr != nil {
Expand Down
10 changes: 10 additions & 0 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ package executor

import (
"context"
"fmt"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
Expand Down Expand Up @@ -63,6 +65,14 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
return nil, err
}

failpoint.Inject("assertStmtCtxIsStaleness", func(val failpoint.Value) {
expected := val.(bool)
got := c.Ctx.GetSessionVars().StmtCtx.IsStaleness
if got != expected {
panic(fmt.Sprintf("stmtctx isStaleness wrong, expected:%v, got:%v", expected, got))
}
})

CountStmtNode(stmtNode, c.Ctx.GetSessionVars().InRestrictedSQL)
var lowerPriority bool
if c.Ctx.GetSessionVars().StmtCtx.Priority == mysql.NoPriority {
Expand Down
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1662,6 +1662,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
DiskTracker: disk.NewTracker(memory.LabelForSQLText, -1),
TaskID: stmtctx.AllocateTaskID(),
CTEStorageMap: map[int]*CTEStorages{},
IsStaleness: false,
}
sc.MemTracker.AttachToGlobalTracker(GlobalMemoryUsageTracker)
globalConfig := config.GetGlobalConfig()
Expand Down
10 changes: 10 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,16 @@ func (s *testSerialSuite1) TestSetVar(c *C) {
tk.MustQuery(`show warnings`).Check(testkit.Rows())
tk.MustExec("set @@tidb_enable_clustered_index = 'int_only'")
tk.MustQuery(`show warnings`).Check(testkit.Rows("Warning 1287 'INT_ONLY' is deprecated and will be removed in a future release. Please use 'ON' or 'OFF' instead"))

// test for tidb_enable_stable_result_mode
tk.MustQuery(`select @@tidb_enable_stable_result_mode`).Check(testkit.Rows("0"))
tk.MustExec(`set global tidb_enable_stable_result_mode = 1`)
tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("1"))
tk.MustExec(`set global tidb_enable_stable_result_mode = 0`)
tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("0"))
tk.MustExec(`set tidb_enable_stable_result_mode=1`)
tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("0"))
tk.MustQuery(`select @@tidb_enable_stable_result_mode`).Check(testkit.Rows("1"))
}

func (s *testSuite5) TestTruncateIncorrectIntSessionVar(c *C) {
Expand Down
95 changes: 95 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ func (s *testStaleTxnSerialSuite) TestSelectAsOf(c *C) {
c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0))
}
}
failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleTSO")
failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleTSOWithTolerance")
}

func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) {
Expand Down Expand Up @@ -996,3 +998,96 @@ func (s *testStaleTxnSerialSuite) TestStaleReadPrepare(c *C) {
tk.MustExec(fmt.Sprintf(`set transaction read only as of timestamp '%s'`, time1.Format("2006-1-2 15:04:05.000")))
c.Assert("execute p1", NotNil)
}

func (s *testStaleTxnSuite) TestStmtCtxStaleFlag(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
defer tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int)")
time.Sleep(2 * time.Second)
time1 := time.Now().Format("2006-1-2 15:04:05")
testcases := []struct {
sql string
hasStaleFlag bool
}{
// assert select as of statement
{
sql: fmt.Sprintf("select * from t as of timestamp '%v'", time1),
hasStaleFlag: true,
},
// assert select statement
{
sql: fmt.Sprintf("select * from t"),
hasStaleFlag: false,
},
// assert select statement in stale transaction
{
sql: fmt.Sprintf("start transaction read only as of timestamp '%v'", time1),
hasStaleFlag: false,
},
{
sql: fmt.Sprintf("select * from t"),
hasStaleFlag: true,
},
{
sql: "commit",
hasStaleFlag: false,
},
// assert select statement after set transaction
{
sql: fmt.Sprintf("set transaction read only as of timestamp '%v'", time1),
hasStaleFlag: false,
},
{
sql: fmt.Sprintf("select * from t"),
hasStaleFlag: true,
},
// assert select statement after consumed set transaction
{
sql: fmt.Sprintf("select * from t"),
hasStaleFlag: false,
},
// assert prepare statement with select as of statement
{
sql: fmt.Sprintf(`prepare p from 'select * from t as of timestamp "%v"'`, time1),
hasStaleFlag: false,
},
// assert execute statement with select as of statement
{
sql: "execute p",
hasStaleFlag: true,
},
// assert prepare common select statement
{
sql: "prepare p1 from 'select * from t'",
hasStaleFlag: false,
},
{
sql: "execute p1",
hasStaleFlag: false,
},
// assert execute select statement in stale transaction
{
sql: fmt.Sprintf("start transaction read only as of timestamp '%v'", time1),
hasStaleFlag: false,
},
{
sql: "execute p1",
hasStaleFlag: true,
},
{
sql: "commit",
hasStaleFlag: false,
},
}

for _, testcase := range testcases {
failpoint.Enable("github.com/pingcap/tidb/exector/assertStmtCtxIsStaleness",
fmt.Sprintf("return(%v)", testcase.hasStaleFlag))
tk.MustExec(testcase.sql)
failpoint.Disable("github.com/pingcap/tidb/exector/assertStmtCtxIsStaleness")
// assert stale read flag should be false after each statement execution
c.Assert(tk.Se.GetSessionVars().StmtCtx.IsStaleness, IsFalse)
}
}
36 changes: 36 additions & 0 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package executor_test

import (
"bytes"
"fmt"
"math/rand"
"strings"
Expand Down Expand Up @@ -848,3 +849,38 @@ func (s *tiflashTestSuite) TestTiFlashPartitionTableBroadcastJoin(c *C) {
}
}
}

func (s *tiflashTestSuite) TestForbidTiflashDuringStaleRead(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a bigint(20))")
tk.MustExec("alter table t set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
time.Sleep(2 * time.Second)
tk.MustExec("insert into t values (9223372036854775807)")
tk.MustExec("insert into t values (9223372036854775807)")
tk.MustExec("insert into t values (9223372036854775807)")
tk.MustExec("insert into t values (9223372036854775807)")
tk.MustExec("insert into t values (9223372036854775807)")
tk.MustExec("insert into t values (9223372036854775807)")
rows := tk.MustQuery("explain select avg(a) from t").Rows()
resBuff := bytes.NewBufferString("")
for _, row := range rows {
fmt.Fprintf(resBuff, "%s\n", row)
}
res := resBuff.String()
c.Assert(strings.Contains(res, "tiflash"), IsTrue)
c.Assert(strings.Contains(res, "tikv"), IsFalse)
tk.MustExec("set transaction read only as of timestamp now(1)")
rows = tk.MustQuery("explain select avg(a) from t").Rows()
resBuff = bytes.NewBufferString("")
for _, row := range rows {
fmt.Fprintf(resBuff, "%s\n", row)
}
res = resBuff.String()
c.Assert(strings.Contains(res, "tiflash"), IsFalse)
c.Assert(strings.Contains(res, "tikv"), IsTrue)
}
7 changes: 5 additions & 2 deletions expression/builtin_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,14 @@ func (c *jsonUnquoteFunctionClass) getFunction(ctx sessionctx.Context, args []Ex
return sig, nil
}

func (b *builtinJSONUnquoteSig) evalString(row chunk.Row) (string, bool, error) {
str, isNull, err := b.args[0].EvalString(b.ctx, row)
func (b *builtinJSONUnquoteSig) evalString(row chunk.Row) (str string, isNull bool, err error) {
str, isNull, err = b.args[0].EvalString(b.ctx, row)
if isNull || err != nil {
return "", isNull, err
}
if len(str) >= 2 && str[0] == '"' && str[len(str)-1] == '"' && !goJSON.Valid([]byte(str)) {
return "", false, json.ErrInvalidJSONText.GenWithStackByArgs("The document root must not be followed by other values.")
}
str, err = json.UnquoteString(str)
if err != nil {
return "", false, err
Expand Down
52 changes: 30 additions & 22 deletions expression/builtin_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package expression
import (
. "github.com/pingcap/check"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
Expand Down Expand Up @@ -78,31 +79,38 @@ func (s *testEvaluatorSuite) TestJSONQuote(c *C) {
func (s *testEvaluatorSuite) TestJSONUnquote(c *C) {
fc := funcs[ast.JSONUnquote]
tbl := []struct {
Input interface{}
Expected interface{}
Input string
Result string
Error error
}{
{nil, nil},
{``, ``},
{`""`, ``},
{`''`, `''`},
{`"a"`, `a`},
{`3`, `3`},
{`{"a": "b"}`, `{"a": "b"}`},
{`{"a": "b"}`, `{"a": "b"}`},
{`"hello,\"quoted string\",world"`, `hello,"quoted string",world`},
{`"hello,\"宽字符\",world"`, `hello,"宽字符",world`},
{`Invalid Json string\tis OK`, `Invalid Json string\tis OK`},
{`"1\\u2232\\u22322"`, `1\u2232\u22322`},
{`"[{\"x\":\"{\\\"y\\\":12}\"}]"`, `[{"x":"{\"y\":12}"}]`},
{`[{\"x\":\"{\\\"y\\\":12}\"}]`, `[{\"x\":\"{\\\"y\\\":12}\"}]`},
{``, ``, nil},
{`""`, ``, nil},
{`''`, `''`, nil},
{`3`, `3`, nil},
{`{"a": "b"}`, `{"a": "b"}`, nil},
{`{"a": "b"}`, `{"a": "b"}`, nil},
{`"hello,\"quoted string\",world"`, `hello,"quoted string",world`, nil},
{`"hello,\"宽字符\",world"`, `hello,"宽字符",world`, nil},
{`Invalid Json string\tis OK`, `Invalid Json string\tis OK`, nil},
{`"1\\u2232\\u22322"`, `1\u2232\u22322`, nil},
{`"[{\"x\":\"{\\\"y\\\":12}\"}]"`, `[{"x":"{\"y\":12}"}]`, nil},
{`[{\"x\":\"{\\\"y\\\":12}\"}]`, `[{\"x\":\"{\\\"y\\\":12}\"}]`, nil},
{`"a"`, `a`, nil},
{`""a""`, `""a""`, json.ErrInvalidJSONText.GenWithStackByArgs("The document root must not be followed by other values.")},
{`"""a"""`, `"""a"""`, json.ErrInvalidJSONText.GenWithStackByArgs("The document root must not be followed by other values.")},
}
dtbl := tblToDtbl(tbl)
for _, t := range dtbl {
f, err := fc.getFunction(s.ctx, s.datumsToConstants(t["Input"]))
c.Assert(err, IsNil)
d, err := evalBuiltinFunc(f, chunk.Row{})
for _, t := range tbl {
var d types.Datum
d.SetString(t.Input, mysql.DefaultCollationName)
f, err := fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{d}))
c.Assert(err, IsNil)
c.Assert(d, testutil.DatumEquals, t["Expected"][0])
d, err = evalBuiltinFunc(f, chunk.Row{})
if t.Error == nil {
c.Assert(d.GetString(), Equals, t.Result)
c.Assert(err, IsNil)
} else {
c.Assert(err, ErrorMatches, ".*The document root must not be followed by other values.*")
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion expression/builtin_json_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,11 @@ func (b *builtinJSONUnquoteSig) vecEvalString(input *chunk.Chunk, result *chunk.
result.AppendNull()
continue
}
str, err := json.UnquoteString(buf.GetString(i))
str := buf.GetString(i)
if len(str) >= 2 && str[0] == '"' && str[len(str)-1] == '"' && !goJSON.Valid([]byte(str)) {
return json.ErrInvalidJSONText.GenWithStackByArgs("The document root must not be followed by other values.")
}
str, err := json.UnquoteString(str)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion expression/builtin_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -7037,7 +7037,7 @@ func (b *builtinLastDaySig) evalTime(row chunk.Row) (types.Time, bool, error) {
}
tm := arg
year, month := tm.Year(), tm.Month()
if arg.InvalidZero() {
if tm.Month() == 0 || (tm.Day() == 0 && b.ctx.GetSessionVars().SQLMode.HasNoZeroDateMode()) {
return types.ZeroTime, true, handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, arg.String()))
}
lastDay := types.GetLastDay(year, month)
Expand Down
33 changes: 23 additions & 10 deletions expression/builtin_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2776,22 +2776,35 @@ func (s *testEvaluatorSuite) TestLastDay(c *C) {
c.Assert(result, Equals, test.expect)
}

testsNull := []interface{}{
"0000-00-00",
"1992-13-00",
"2007-10-07 23:59:61",
"2005-00-00",
"2005-00-01",
"2243-01 00:00:00",
123456789}
var timeData types.Time
timeData.StrToDate(s.ctx.GetSessionVars().StmtCtx, "202010", "%Y%m")
testsNull := []struct {
param interface{}
isNilNoZeroDate bool
isNil bool
}{
{"0000-00-00", true, true},
{"1992-13-00", true, true},
{"2007-10-07 23:59:61", true, true},
{"2005-00-00", true, true},
{"2005-00-01", true, true},
{"2243-01 00:00:00", true, true},
{123456789, true, true},
{timeData, true, false},
}

for _, i := range testsNull {
t := []types.Datum{types.NewDatum(i)}
t := []types.Datum{types.NewDatum(i.param)}
f, err := fc.getFunction(s.ctx, s.datumsToConstants(t))
c.Assert(err, IsNil)
d, err := evalBuiltinFunc(f, chunk.Row{})
c.Assert(err, IsNil)
c.Assert(d.IsNull(), IsTrue)
c.Assert(d.IsNull() == i.isNilNoZeroDate, IsTrue)
s.ctx.GetSessionVars().SQLMode &= ^mysql.ModeNoZeroDate
d, err = evalBuiltinFunc(f, chunk.Row{})
c.Assert(err, IsNil)
c.Assert(d.IsNull() == i.isNil, IsTrue)
s.ctx.GetSessionVars().SQLMode |= mysql.ModeNoZeroDate
}
}

Expand Down
Loading

0 comments on commit 7dff9fe

Please sign in to comment.