diff --git a/bindinfo/bind_test.go b/bindinfo/bind_test.go index a0acb7ba89cf4..1eb7e1478b2f9 100644 --- a/bindinfo/bind_test.go +++ b/bindinfo/bind_test.go @@ -249,108 +249,108 @@ var testSQLs = []struct { overlaySQL: "binding for select * from t where i>99 using select * from t use index(index_t) where i>99", querySQL: "select * from t where i > 30.0", originSQL: "select * from `test` . `t` where `i` > ?", - bindSQL: "SELECT * FROM test.t USE INDEX (index_t) WHERE i > 99", + bindSQL: "SELECT * FROM `test`.`t` USE INDEX (`index_t`) WHERE `i` > 99", dropSQL: "binding for select * from t where i>100", - memoryUsage: float64(118), + memoryUsage: float64(126), }, { createSQL: "binding for select * from t union all select * from t using select * from t use index(index_t) union all select * from t use index()", overlaySQL: "", querySQL: "select * from t union all select * from t", originSQL: "select * from `test` . `t` union all select * from `test` . `t`", - bindSQL: "SELECT * FROM test.t USE INDEX (index_t) UNION ALL SELECT * FROM test.t USE INDEX ()", + bindSQL: "SELECT * FROM `test`.`t` USE INDEX (`index_t`) UNION ALL SELECT * FROM `test`.`t` USE INDEX ()", dropSQL: "binding for select * from t union all select * from t", - memoryUsage: float64(172), + memoryUsage: float64(182), }, { createSQL: "binding for (select * from t) union all (select * from t) using (select * from t use index(index_t)) union all (select * from t use index())", overlaySQL: "", querySQL: "(select * from t) union all (select * from t)", originSQL: "( select * from `test` . `t` ) union all ( select * from `test` . `t` )", - bindSQL: "(SELECT * FROM test.t USE INDEX (index_t)) UNION ALL (SELECT * FROM test.t USE INDEX ())", + bindSQL: "(SELECT * FROM `test`.`t` USE INDEX (`index_t`)) UNION ALL (SELECT * FROM `test`.`t` USE INDEX ())", dropSQL: "binding for (select * from t) union all (select * from t)", - memoryUsage: float64(184), + memoryUsage: float64(194), }, { createSQL: "binding for select * from t intersect select * from t using select * from t use index(index_t) intersect select * from t use index()", overlaySQL: "", querySQL: "select * from t intersect select * from t", originSQL: "select * from `test` . `t` intersect select * from `test` . `t`", - bindSQL: "SELECT * FROM test.t USE INDEX (index_t) INTERSECT SELECT * FROM test.t USE INDEX ()", + bindSQL: "SELECT * FROM `test`.`t` USE INDEX (`index_t`) INTERSECT SELECT * FROM `test`.`t` USE INDEX ()", dropSQL: "binding for select * from t intersect select * from t", - memoryUsage: float64(172), + memoryUsage: float64(182), }, { createSQL: "binding for select * from t except select * from t using select * from t use index(index_t) except select * from t use index()", overlaySQL: "", querySQL: "select * from t except select * from t", originSQL: "select * from `test` . `t` except select * from `test` . `t`", - bindSQL: "SELECT * FROM test.t USE INDEX (index_t) EXCEPT SELECT * FROM test.t USE INDEX ()", + bindSQL: "SELECT * FROM `test`.`t` USE INDEX (`index_t`) EXCEPT SELECT * FROM `test`.`t` USE INDEX ()", dropSQL: "binding for select * from t except select * from t", - memoryUsage: float64(166), + memoryUsage: float64(176), }, { createSQL: "binding for select * from t using select /*+ use_index(t,index_t)*/ * from t", overlaySQL: "", querySQL: "select * from t ", originSQL: "select * from `test` . `t`", - bindSQL: "SELECT /*+ use_index(t index_t)*/ * FROM test.t", + bindSQL: "SELECT /*+ use_index(`t` `index_t`)*/ * FROM `test`.`t`", dropSQL: "binding for select * from t", - memoryUsage: float64(98), + memoryUsage: float64(106), }, { createSQL: "binding for delete from t where i = 1 using delete /*+ use_index(t,index_t) */ from t where i = 1", overlaySQL: "", querySQL: "delete from t where i = 2", originSQL: "delete from `test` . `t` where `i` = ?", - bindSQL: "DELETE /*+ use_index(t index_t)*/ FROM test.t WHERE i = 1", + bindSQL: "DELETE /*+ use_index(`t` `index_t`)*/ FROM `test`.`t` WHERE `i` = 1", dropSQL: "binding for delete from t where i = 1", - memoryUsage: float64(120), + memoryUsage: float64(130), }, { createSQL: "binding for delete t, t1 from t inner join t1 on t.s = t1.s where t.i = 1 using delete /*+ use_index(t,index_t), hash_join(t,t1) */ t, t1 from t inner join t1 on t.s = t1.s where t.i = 1", overlaySQL: "", querySQL: "delete t, t1 from t inner join t1 on t.s = t1.s where t.i = 2", originSQL: "delete `test` . `t` , `test` . `t1` from `test` . `t` join `test` . `t1` on `t` . `s` = `t1` . `s` where `t` . `i` = ?", - bindSQL: "DELETE /*+ use_index(t index_t) hash_join(t, t1)*/ test.t,test.t1 FROM test.t JOIN test.t1 ON t.s = t1.s WHERE t.i = 1", + bindSQL: "DELETE /*+ use_index(`t` `index_t`) hash_join(`t`, `t1`)*/ `test`.`t`,`test`.`t1` FROM `test`.`t` JOIN `test`.`t1` ON `t`.`s` = `t1`.`s` WHERE `t`.`i` = 1", dropSQL: "binding for delete t, t1 from t inner join t1 on t.s = t1.s where t.i = 1", - memoryUsage: float64(261), + memoryUsage: float64(297), }, { createSQL: "binding for update t set s = 'a' where i = 1 using update /*+ use_index(t,index_t) */ t set s = 'a' where i = 1", overlaySQL: "", querySQL: "update t set s='b' where i=2", originSQL: "update `test` . `t` set `s` = ? where `i` = ?", - bindSQL: "UPDATE /*+ use_index(t index_t)*/ test.t SET s='a' WHERE i = 1", + bindSQL: "UPDATE /*+ use_index(`t` `index_t`)*/ `test`.`t` SET `s`='a' WHERE `i` = 1", dropSQL: "binding for update t set s = 'a' where i = 1", - memoryUsage: float64(132), + memoryUsage: float64(144), }, { createSQL: "binding for update t, t1 set t.s = 'a' where t.i = t1.i using update /*+ inl_join(t1) */ t, t1 set t.s = 'a' where t.i = t1.i", overlaySQL: "", querySQL: "update t , t1 set t.s='b' where t.i=t1.i", originSQL: "update ( `test` . `t` ) join `test` . `t1` set `t` . `s` = ? where `t` . `i` = `t1` . `i`", - bindSQL: "UPDATE /*+ inl_join(t1)*/ (test.t) JOIN test.t1 SET t.s='a' WHERE t.i = t1.i", + bindSQL: "UPDATE /*+ inl_join(`t1`)*/ (`test`.`t`) JOIN `test`.`t1` SET `t`.`s`='a' WHERE `t`.`i` = `t1`.`i`", dropSQL: "binding for update t, t1 set t.s = 'a' where t.i = t1.i", - memoryUsage: float64(190), + memoryUsage: float64(212), }, { createSQL: "binding for insert into t1 select * from t where t.i = 1 using insert into t1 select /*+ use_index(t,index_t) */ * from t where t.i = 1", overlaySQL: "", querySQL: "insert into t1 select * from t where t.i = 2", originSQL: "insert into `test` . `t1` select * from `test` . `t` where `t` . `i` = ?", - bindSQL: "INSERT INTO test.t1 SELECT /*+ use_index(t index_t)*/ * FROM test.t WHERE t.i = 1", + bindSQL: "INSERT INTO `test`.`t1` SELECT /*+ use_index(`t` `index_t`)*/ * FROM `test`.`t` WHERE `t`.`i` = 1", dropSQL: "binding for insert into t1 select * from t where t.i = 1", - memoryUsage: float64(178), + memoryUsage: float64(194), }, { createSQL: "binding for replace into t1 select * from t where t.i = 1 using replace into t1 select /*+ use_index(t,index_t) */ * from t where t.i = 1", overlaySQL: "", querySQL: "replace into t1 select * from t where t.i = 2", originSQL: "replace into `test` . `t1` select * from `test` . `t` where `t` . `i` = ?", - bindSQL: "REPLACE INTO test.t1 SELECT /*+ use_index(t index_t)*/ * FROM test.t WHERE t.i = 1", + bindSQL: "REPLACE INTO `test`.`t1` SELECT /*+ use_index(`t` `index_t`)*/ * FROM `test`.`t` WHERE `t`.`i` = 1", dropSQL: "binding for replace into t1 select * from t where t.i = 1", - memoryUsage: float64(180), + memoryUsage: float64(196), }, } @@ -655,7 +655,7 @@ func (s *testSuite) TestBindingSymbolList(c *C) { c.Assert(bindData, NotNil) c.Check(bindData.OriginalSQL, Equals, "select `a` , `b` from `test` . `t` where `a` = ? limit ...") bind := bindData.Bindings[0] - c.Check(bind.BindSQL, Equals, "SELECT a,b FROM test.t USE INDEX (ib) WHERE a = 1 LIMIT 0,1") + c.Check(bind.BindSQL, Equals, "SELECT `a`,`b` FROM `test`.`t` USE INDEX (`ib`) WHERE `a` = 1 LIMIT 0,1") c.Check(bindData.Db, Equals, "test") c.Check(bind.Status, Equals, "using") c.Check(bind.Charset, NotNil) @@ -743,7 +743,7 @@ func (s *testSuite) TestBestPlanInBaselines(c *C) { c.Check(bindData, NotNil) c.Check(bindData.OriginalSQL, Equals, "select `a` , `b` from `test` . `t` where `a` = ? limit ...") bind := bindData.Bindings[0] - c.Check(bind.BindSQL, Equals, "SELECT /*+ use_index(@sel_1 test.t ia)*/ a,b FROM test.t WHERE a = 1 LIMIT 0,1") + c.Check(bind.BindSQL, Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `ia`)*/ `a`,`b` FROM `test`.`t` WHERE `a` = 1 LIMIT 0,1") c.Check(bindData.Db, Equals, "test") c.Check(bind.Status, Equals, "using") @@ -775,7 +775,7 @@ func (s *testSuite) TestErrorBind(c *C) { c.Check(bindData, NotNil) c.Check(bindData.OriginalSQL, Equals, "select * from `test` . `t` where `i` > ?") bind := bindData.Bindings[0] - c.Check(bind.BindSQL, Equals, "SELECT * FROM test.t USE INDEX (index_t) WHERE i > 100") + c.Check(bind.BindSQL, Equals, "SELECT * FROM `test`.`t` USE INDEX (`index_t`) WHERE `i` > 100") c.Check(bindData.Db, Equals, "test") c.Check(bind.Status, Equals, "using") c.Check(bind.Charset, NotNil) @@ -913,13 +913,13 @@ func (s *testSuite) TestDMLCapturePlanBaseline(c *C) { rows = tk.MustQuery("show global bindings").Sort().Rows() c.Assert(len(rows), Equals, 4) c.Assert(rows[0][0], Equals, "delete from `test` . `t` where `b` = ? and `c` > ?") - c.Assert(rows[0][1], Equals, "DELETE /*+ use_index(@`del_1` `test`.`t` `idx_b`)*/ FROM `test`.`t` WHERE `b`=1 AND `c`>1") + c.Assert(rows[0][1], Equals, "DELETE /*+ use_index(@`del_1` `test`.`t` `idx_b`)*/ FROM `test`.`t` WHERE `b` = 1 AND `c` > 1") c.Assert(rows[1][0], Equals, "insert into `test` . `t1` select * from `test` . `t` where `t` . `b` = ? and `t` . `c` > ?") - c.Assert(rows[1][1], Equals, "INSERT INTO `test`.`t1` SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_b`)*/ * FROM `test`.`t` WHERE `t`.`b`=1 AND `t`.`c`>1") + c.Assert(rows[1][1], Equals, "INSERT INTO `test`.`t1` SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_b`)*/ * FROM `test`.`t` WHERE `t`.`b` = 1 AND `t`.`c` > 1") c.Assert(rows[2][0], Equals, "replace into `test` . `t1` select * from `test` . `t` where `t` . `b` = ? and `t` . `c` > ?") - c.Assert(rows[2][1], Equals, "REPLACE INTO `test`.`t1` SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_b`)*/ * FROM `test`.`t` WHERE `t`.`b`=1 AND `t`.`c`>1") + c.Assert(rows[2][1], Equals, "REPLACE INTO `test`.`t1` SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_b`)*/ * FROM `test`.`t` WHERE `t`.`b` = 1 AND `t`.`c` > 1") c.Assert(rows[3][0], Equals, "update `test` . `t` set `a` = ? where `b` = ? and `c` > ?") - c.Assert(rows[3][1], Equals, "UPDATE /*+ use_index(@`upd_1` `test`.`t` `idx_b`)*/ `test`.`t` SET `a`=1 WHERE `b`=1 AND `c`>1") + c.Assert(rows[3][1], Equals, "UPDATE /*+ use_index(@`upd_1` `test`.`t` `idx_b`)*/ `test`.`t` SET `a`=1 WHERE `b` = 1 AND `c` > 1") } func (s *testSuite) TestCapturePlanBaseline(c *C) { @@ -948,7 +948,7 @@ func (s *testSuite) TestCapturePlanBaseline(c *C) { rows = tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 1) c.Assert(rows[0][0], Equals, "select * from `test` . `t` where `a` > ?") - c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` )*/ * FROM `test`.`t` WHERE `a`>10") + c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` )*/ * FROM `test`.`t` WHERE `a` > 10") } func (s *testSuite) TestCaptureDBCaseSensitivity(c *C) { @@ -968,7 +968,7 @@ func (s *testSuite) TestCaptureDBCaseSensitivity(c *C) { // so there would be no new binding captured. rows := tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 1) - c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(t )*/ * FROM SPM.t") + c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(`t` )*/ * FROM `SPM`.`t`") c.Assert(rows[0][8], Equals, "manual") } @@ -1104,7 +1104,7 @@ func (s *testSuite) TestCapturePreparedStmt(c *C) { rows := tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 1) c.Assert(rows[0][0], Equals, "select * from `test` . `t` where `b` = ? and `c` > ?") - c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_c`)*/ * FROM `test`.`t` WHERE `b`=? AND `c`>?") + c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_c`)*/ * FROM `test`.`t` WHERE `b` = ? AND `c` > ?") c.Assert(tk.MustUseIndex("select /*+ use_index(t,idx_b) */ * from t where b = 1 and c > 1", "idx_c(c)"), IsTrue) tk.MustExec("admin flush bindings") @@ -1112,7 +1112,7 @@ func (s *testSuite) TestCapturePreparedStmt(c *C) { rows = tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 1) c.Assert(rows[0][0], Equals, "select * from `test` . `t` where `b` = ? and `c` > ?") - c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_c`)*/ * FROM `test`.`t` WHERE `b`=? AND `c`>?") + c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_c`)*/ * FROM `test`.`t` WHERE `b` = ? AND `c` > ?") } func (s *testSuite) TestDropSingleBindings(c *C) { @@ -1129,11 +1129,11 @@ func (s *testSuite) TestDropSingleBindings(c *C) { // The size of bindings is equal to one. Because for one normalized sql, // the `create binding` clears all the origin bindings. c.Assert(len(rows), Equals, 1) - c.Assert(rows[0][1], Equals, "SELECT * FROM test.t USE INDEX (idx_b)") + c.Assert(rows[0][1], Equals, "SELECT * FROM `test`.`t` USE INDEX (`idx_b`)") tk.MustExec("drop binding for select * from t using select * from t use index(idx_a)") rows = tk.MustQuery("show bindings").Rows() c.Assert(len(rows), Equals, 1) - c.Assert(rows[0][1], Equals, "SELECT * FROM test.t USE INDEX (idx_b)") + c.Assert(rows[0][1], Equals, "SELECT * FROM `test`.`t` USE INDEX (`idx_b`)") tk.MustExec("drop table t") tk.MustExec("drop binding for select * from t using select * from t use index(idx_b)") rows = tk.MustQuery("show bindings").Rows() @@ -1147,11 +1147,11 @@ func (s *testSuite) TestDropSingleBindings(c *C) { // The size of bindings is equal to one. Because for one normalized sql, // the `create binding` clears all the origin bindings. c.Assert(len(rows), Equals, 1) - c.Assert(rows[0][1], Equals, "SELECT * FROM test.t USE INDEX (idx_b)") + c.Assert(rows[0][1], Equals, "SELECT * FROM `test`.`t` USE INDEX (`idx_b`)") tk.MustExec("drop global binding for select * from t using select * from t use index(idx_a)") rows = tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 1) - c.Assert(rows[0][1], Equals, "SELECT * FROM test.t USE INDEX (idx_b)") + c.Assert(rows[0][1], Equals, "SELECT * FROM `test`.`t` USE INDEX (`idx_b`)") tk.MustExec("drop table t") tk.MustExec("drop global binding for select * from t using select * from t use index(idx_b)") rows = tk.MustQuery("show global bindings").Rows() @@ -1234,12 +1234,12 @@ func (s *testSuite) TestAddEvolveTasks(c *C) { tk.MustExec("admin flush bindings") rows := tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 2) - c.Assert(rows[1][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` )*/ * FROM `test`.`t` WHERE `a`>=4 AND `b`>=1 AND `c`=0") + c.Assert(rows[1][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` )*/ * FROM `test`.`t` WHERE `a` >= 4 AND `b` >= 1 AND `c` = 0") c.Assert(rows[1][3], Equals, "pending verify") tk.MustExec("admin evolve bindings") rows = tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 2) - c.Assert(rows[1][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` )*/ * FROM `test`.`t` WHERE `a`>=4 AND `b`>=1 AND `c`=0") + c.Assert(rows[1][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` )*/ * FROM `test`.`t` WHERE `a` >= 4 AND `b` >= 1 AND `c` = 0") status := rows[1][3].(string) c.Assert(status == "using" || status == "rejected", IsTrue) } @@ -1258,7 +1258,7 @@ func (s *testSuite) TestRuntimeHintsInEvolveTasks(c *C) { tk.MustExec("admin flush bindings") rows := tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 2) - c.Assert(rows[1][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_c`)*/ * FROM `test`.`t` WHERE `a`>=4 AND `b`>=1 AND `c`=0") // MAX_EXECUTION_TIME is ignored + c.Assert(rows[1][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_c`)*/ * FROM `test`.`t` WHERE `a` >= 4 AND `b` >= 1 AND `c` = 0") // MAX_EXECUTION_TIME is ignored s.cleanBindingEnv(tk) tk.MustExec("create global binding for select * from t where a >= 1 and b >= 1 and c = 0 using select /*+ MAX_EXECUTION_TIME(5000) */* from t use index(idx_a) where a >= 1 and b >= 1 and c = 0") @@ -1266,7 +1266,7 @@ func (s *testSuite) TestRuntimeHintsInEvolveTasks(c *C) { tk.MustExec("admin flush bindings") rows = tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 2) - c.Assert(rows[1][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_c`), max_execution_time(5000)*/ * FROM `test`.`t` WHERE `a`>=4 AND `b`>=1 AND `c`=0") + c.Assert(rows[1][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_c`), max_execution_time(5000)*/ * FROM `test`.`t` WHERE `a` >= 4 AND `b` >= 1 AND `c` = 0") } func (s *testSuite) TestBindingCache(c *C) { @@ -1450,7 +1450,7 @@ func (s *testSuite) TestEvolveInvalidBindings(c *C) { tk.MustExec("insert into mysql.bind_info values('select * from test . t where a > ?', 'SELECT /*+ USE_INDEX(t,idx_a) */ * FROM test.t WHERE a > 10', 'test', 'rejected', '2000-01-01 09:00:00', '2000-01-01 09:00:00', '', '','" + bindinfo.Manual + "')") tk.MustQuery("select bind_sql, status from mysql.bind_info where source != 'builtin'").Sort().Check(testkit.Rows( - "SELECT /*+ USE_INDEX(t )*/ * FROM test.t WHERE a > 10 using", + "SELECT /*+ USE_INDEX(`t` )*/ * FROM `test`.`t` WHERE `a` > 10 using", "SELECT /*+ USE_INDEX(t,idx_a) */ * FROM test.t WHERE a > 10 rejected", )) // Reload cache from mysql.bind_info. @@ -1463,7 +1463,7 @@ func (s *testSuite) TestEvolveInvalidBindings(c *C) { rows := tk.MustQuery("show global bindings").Sort().Rows() c.Assert(len(rows), Equals, 2) // Make sure this "using" binding is not overrided. - c.Assert(rows[0][1], Equals, "SELECT /*+ USE_INDEX(t )*/ * FROM test.t WHERE a > 10") + c.Assert(rows[0][1], Equals, "SELECT /*+ USE_INDEX(`t` )*/ * FROM `test`.`t` WHERE `a` > 10") status := rows[0][3].(string) c.Assert(status == "using", IsTrue) c.Assert(rows[1][1], Equals, "SELECT /*+ USE_INDEX(t,idx_a) */ * FROM test.t WHERE a > 10") @@ -1661,7 +1661,7 @@ func (s *testSuite) TestNotEvolvePlanForReadStorageHint(c *C) { rows = tk.MustQuery("show global bindings").Rows() // None evolve task, because of the origin binding is a read_from_storage binding. c.Assert(len(rows), Equals, 1) - c.Assert(rows[0][1], Equals, "SELECT /*+ read_from_storage(tiflash[t])*/ * FROM test.t WHERE a >= 1 AND b >= 1") + c.Assert(rows[0][1], Equals, "SELECT /*+ read_from_storage(tiflash[`t`])*/ * FROM `test`.`t` WHERE `a` >= 1 AND `b` >= 1") c.Assert(rows[0][3], Equals, "using") } @@ -1714,7 +1714,7 @@ func (s *testSuite) TestReCreateBindAfterEvolvePlan(c *C) { tk.MustExec("admin flush bindings") rows := tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 2) - c.Assert(rows[1][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` )*/ * FROM `test`.`t` WHERE `a`>=0 AND `b`>=0") + c.Assert(rows[1][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` )*/ * FROM `test`.`t` WHERE `a` >= 0 AND `b` >= 0") c.Assert(rows[1][3], Equals, "pending verify") tk.MustExec("create global binding for select * from t where a >= 1 and b >= 1 using select * from t use index(idx_b) where a >= 1 and b >= 1") @@ -1914,8 +1914,8 @@ func (s *testSuite) TestCapturedBindingCharset(c *C) { tk.MustExec("admin capture bindings") rows := tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 1) - c.Assert(rows[0][0], Equals, "update `test` . `t` set `name` = (_charset) ? where `name` <= (_charset) ?") - c.Assert(rows[0][1], Equals, "UPDATE /*+ use_index(@`upd_1` `test`.`t` `idx`)*/ `test`.`t` SET `name`=_ASCII'hello' WHERE `name`<=_ASCII'abc'") + c.Assert(rows[0][0], Equals, "update `test` . `t` set `name` = ? where `name` <= ?") + c.Assert(rows[0][1], Equals, "UPDATE /*+ use_index(@`upd_1` `test`.`t` `idx`)*/ `test`.`t` SET `name`='hello' WHERE `name` <= 'abc'") // Charset and Collation are empty now, they are not used currently. c.Assert(rows[0][6], Equals, "") c.Assert(rows[0][7], Equals, "") @@ -1958,7 +1958,7 @@ func (s *testSuite) TestUpdateSubqueryCapture(c *C) { tk.MustExec("admin capture bindings") rows := tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 1) - bindSQL := "UPDATE /*+ use_index(@`upd_1` `test`.`t1` `idx_b`), use_index(@`sel_1` `test`.`t2` ), hash_join(@`upd_1` `test`.`t1`), use_index(@`sel_2` `test`.`t2` )*/ `test`.`t1` SET `b`=1 WHERE `b`=2 AND (`a` IN (SELECT `a` FROM `test`.`t2` WHERE `b`=1) OR `c` IN (SELECT `a` FROM `test`.`t2` WHERE `b`=1))" + bindSQL := "UPDATE /*+ use_index(@`upd_1` `test`.`t1` `idx_b`), use_index(@`sel_1` `test`.`t2` ), hash_join(@`upd_1` `test`.`t1`), use_index(@`sel_2` `test`.`t2` )*/ `test`.`t1` SET `b`=1 WHERE `b` = 2 AND (`a` IN (SELECT `a` FROM `test`.`t2` WHERE `b` = 1) OR `c` IN (SELECT `a` FROM `test`.`t2` WHERE `b` = 1))" c.Assert(rows[0][1], Equals, bindSQL) tk.MustExec(bindSQL) c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 0) @@ -1984,7 +1984,7 @@ func (s *testSuite) TestIssue20417(c *C) { rows := tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 1) c.Assert(rows[0][0], Equals, "select * from `test` . `t`") - c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(t idxb)*/ * FROM test.t") + c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(`t` `idxb`)*/ * FROM `test`.`t`") c.Assert(tk.MustUseIndex("select * from t", "idxb(b)"), IsTrue) c.Assert(tk.MustUseIndex("select * from test.t", "idxb(b)"), IsTrue) @@ -2004,7 +2004,7 @@ func (s *testSuite) TestIssue20417(c *C) { rows = tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 1) c.Assert(rows[0][0], Equals, "select * from `test` . `t` where `b` = ? and `c` = ?") - c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idxb`)*/ * FROM `test`.`t` WHERE `b`=2 AND `c`=213124") + c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idxb`)*/ * FROM `test`.`t` WHERE `b` = 2 AND `c` = 213124") tk.MustExec("set @@tidb_capture_plan_baselines = off") // Test for evolve baseline @@ -2014,20 +2014,20 @@ func (s *testSuite) TestIssue20417(c *C) { rows = tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 1) c.Assert(rows[0][0], Equals, "select * from `test` . `t` where `c` = ?") - c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(@sel_1 test.t idxb)*/ * FROM test.t WHERE c = 3924541") + c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idxb`)*/ * FROM `test`.`t` WHERE `c` = 3924541") tk.MustExec("select /*+ use_index(t idxc)*/ * from t where c=3924541") c.Assert(tk.Se.GetSessionVars().StmtCtx.IndexNames[0], Equals, "t:idxb") tk.MustExec("admin flush bindings") rows = tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 2) c.Assert(rows[1][0], Equals, "select * from `test` . `t` where `c` = ?") - c.Assert(rows[1][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idxc`), use_index(`t` `idxc`)*/ * FROM `test`.`t` WHERE `c`=3924541") + c.Assert(rows[1][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idxc`), use_index(`t` `idxc`)*/ * FROM `test`.`t` WHERE `c` = 3924541") c.Assert(rows[1][3], Equals, "pending verify") tk.MustExec("admin evolve bindings") rows = tk.MustQuery("show global bindings").Rows() c.Assert(len(rows), Equals, 2) c.Assert(rows[1][0], Equals, "select * from `test` . `t` where `c` = ?") - c.Assert(rows[1][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idxc`), use_index(`t` `idxc`)*/ * FROM `test`.`t` WHERE `c`=3924541") + c.Assert(rows[1][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idxc`), use_index(`t` `idxc`)*/ * FROM `test`.`t` WHERE `c` = 3924541") status := rows[1][3].(string) c.Assert(status == "using" || status == "rejected", IsTrue) tk.MustExec("set @@tidb_evolve_plan_baselines=0") @@ -2079,3 +2079,16 @@ func (s *testSuite) TestSPMWithoutUseDatabase(c *C) { tk1.MustExec("select * from test.t") tk1.MustQuery(`select @@last_plan_from_binding;`).Check(testkit.Rows("1")) } + +func (s *testSuite) TestBindingWithoutCharset(c *C) { + tk := testkit.NewTestKit(c, s.store) + s.cleanBindingEnv(tk) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a varchar(10) CHARACTER SET utf8)") + tk.MustExec("create global binding for select * from t where a = 'aa' using select * from t where a = 'aa'") + rows := tk.MustQuery("show global bindings").Rows() + c.Assert(len(rows), Equals, 1) + c.Assert(rows[0][0], Equals, "select * from `test` . `t` where `a` = ?") + c.Assert(rows[0][1], Equals, "SELECT * FROM `test`.`t` WHERE `a` = 'aa'") +} diff --git a/bindinfo/handle.go b/bindinfo/handle.go index 61148d42712b8..6111910395d55 100644 --- a/bindinfo/handle.go +++ b/bindinfo/handle.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/parser" "github.com/pingcap/parser/ast" - "github.com/pingcap/parser/format" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/metrics" @@ -689,14 +688,10 @@ func GenerateBindSQL(ctx context.Context, stmtNode ast.StmtNode, planHint string // We need to evolve plan based on the current sql, not the original sql which may have different parameters. // So here we would remove the hint and inject the current best plan hint. hint.BindHint(stmtNode, &hint.HintsSet{}) - var sb strings.Builder - restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, &sb) - restoreCtx.DefaultDB = defaultDB - err := stmtNode.Restore(restoreCtx) - if err != nil { - logutil.Logger(ctx).Debug("[sql-bind] restore SQL failed when generating bind SQL", zap.Error(err)) + bindSQL := utilparser.RestoreWithDefaultDB(stmtNode, defaultDB, "") + if bindSQL == "" { + return "" } - bindSQL := sb.String() switch n := stmtNode.(type) { case *ast.DeleteStmt: deleteIdx := strings.Index(bindSQL, "DELETE") diff --git a/executor/aggregate.go b/executor/aggregate.go index 09186bb4745b4..66556266c1627 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -229,6 +229,7 @@ func (e *HashAggExec) Close() error { e.childResult = nil e.groupSet, _ = set.NewStringSetWithMemoryUsage() e.partialResultMap = nil + e.memTracker.ReplaceBytesUsed(0) return e.baseExecutor.Close() } // `Close` may be called after `Open` without calling `Next` in test. @@ -255,6 +256,7 @@ func (e *HashAggExec) Close() error { for range e.finalOutputCh { } e.executed = false + e.memTracker.ReplaceBytesUsed(0) return e.baseExecutor.Close() } diff --git a/executor/analyze.go b/executor/analyze.go index d1166d0cbbc4c..923e8e12b27a6 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -1146,7 +1146,7 @@ func (e *AnalyzeFastExec) handleScanIter(iter kv.Iterator) (scanKeysSize int, er } func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err error) { - snapshot := e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion) + snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion) if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } @@ -1166,7 +1166,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) { defer e.wg.Done() - snapshot := e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion) + snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion) snapshot.SetOption(kv.NotFillCache, true) snapshot.SetOption(kv.IsolationLevel, kv.RC) snapshot.SetOption(kv.Priority, kv.PriorityLow) diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index a8628330e16ef..107d3501c6d5f 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -24,13 +24,14 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" ) func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader) bool { - if !ctx.GetSessionVars().AllowMPPExecution { + if !ctx.GetSessionVars().AllowMPPExecution || collate.NewCollationEnabled() { return false } _, ok := tr.GetTablePlan().(*plannercore.PhysicalExchangeSender) diff --git a/executor/set_test.go b/executor/set_test.go index 5d90923995f0c..b55ed2328af74 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -518,16 +518,21 @@ func (s *testSerialSuite1) TestSetVar(c *C) { tk.MustExec("SET GLOBAL tidb_enable_extended_stats = off") tk.MustQuery("select @@global.tidb_enable_extended_stats").Check(testkit.Rows("0")) - tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = on") - tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1")) - tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = off") - tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0")) - tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = on") - tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1")) - tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = off") - tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0")) - c.Assert(tk.ExecToErr("SET SESSION tidb_enable_tiflash_fallback_tikv = 123"), NotNil) - c.Assert(tk.ExecToErr("SET GLOBAL tidb_enable_tiflash_fallback_tikv = 321"), NotNil) + tk.MustExec("SET SESSION tidb_allow_fallback_to_tikv = 'tiflash'") + tk.MustQuery("select @@session.tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash")) + tk.MustExec("SET SESSION tidb_allow_fallback_to_tikv = ''") + tk.MustQuery("select @@session.tidb_allow_fallback_to_tikv").Check(testkit.Rows("")) + tk.MustExec("SET GLOBAL tidb_allow_fallback_to_tikv = 'tiflash'") + tk.MustQuery("select @@global.tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash")) + tk.MustExec("SET GLOBAL tidb_allow_fallback_to_tikv = ''") + tk.MustQuery("select @@global.tidb_allow_fallback_to_tikv").Check(testkit.Rows("")) + tk.MustExec("set @@tidb_allow_fallback_to_tikv = 'tiflash, tiflash, tiflash'") + tk.MustQuery("select @@tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash")) + + tk.MustGetErrMsg("SET SESSION tidb_allow_fallback_to_tikv = 'tikv,tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tikv,tiflash'") + tk.MustGetErrMsg("SET GLOBAL tidb_allow_fallback_to_tikv = 'tikv,tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tikv,tiflash'") + tk.MustGetErrMsg("set @@tidb_allow_fallback_to_tikv = 'tidb, tiflash, tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tidb, tiflash, tiflash'") + tk.MustGetErrMsg("set @@tidb_allow_fallback_to_tikv = 'unknown, tiflash, tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'unknown, tiflash, tiflash'") // Test issue #22145 tk.MustExec(`set global sync_relay_log = "'"`) diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index 20679644dabb6..78b745fb62dfe 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/pingcap/tidb/store/tikv/mockstore/cluster" "github.com/pingcap/tidb/util/testkit" - "github.com/pingcap/tidb/util/testleak" ) type tiflashTestSuite struct { @@ -43,7 +42,6 @@ type tiflashTestSuite struct { } func (s *tiflashTestSuite) SetUpSuite(c *C) { - testleak.BeforeTest() var err error s.store, err = mockstore.NewMockStore( mockstore.WithClusterInspector(func(c cluster.Cluster) { @@ -280,6 +278,23 @@ func (s *tiflashTestSuite) TestPartitionTable(c *C) { failpoint.Disable("github.com/pingcap/tidb/executor/checkUseMPP") } +func (s *tiflashTestSuite) TestMppEnum(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int not null primary key, b enum('aca','bca','zca'))") + tk.MustExec("alter table t set tiflash replica 1") + tb := testGetTableByName(c, tk.Se, "test", "t") + err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) + c.Assert(err, IsNil) + tk.MustExec("insert into t values(1,'aca')") + tk.MustExec("insert into t values(2,'bca')") + tk.MustExec("insert into t values(3,'zca')") + tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") + tk.MustExec("set @@session.tidb_allow_mpp=ON") + tk.MustQuery("select t1.b from t t1 join t t2 on t1.a = t2.a order by t1.b").Check(testkit.Rows("aca", "bca", "zca")) +} + func (s *tiflashTestSuite) TestCancelMppTasks(c *C) { var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang" tk := testkit.NewTestKit(c, s.store) @@ -314,7 +329,6 @@ func (s *tiflashTestSuite) TestCancelMppTasks(c *C) { // all goroutines exit if one goroutine hangs but another return errors func (s *tiflashTestSuite) TestMppGoroutinesExitFromErrors(c *C) { - defer testleak.AfterTest(c)() // mock non-root tasks return error var mppNonRootTaskError = "github.com/pingcap/tidb/store/copr/mppNonRootTaskError" // mock root tasks hang diff --git a/expression/expr_to_pb_test.go b/expression/expr_to_pb_test.go index 4299e637b6df1..b215fc499cd1f 100644 --- a/expression/expr_to_pb_test.go +++ b/expression/expr_to_pb_test.go @@ -629,6 +629,11 @@ func (s *testEvaluatorSuite) TestExprPushDownToFlash(c *C) { c.Assert(err, IsNil) exprs = append(exprs, function) + // ExtractDatetime: can be pushed + function, err = NewFunction(mock.NewContext(), ast.Extract, types.NewFieldType(mysql.TypeLonglong), stringColumn, datetimeColumn) + c.Assert(err, IsNil) + exprs = append(exprs, function) + // CastIntAsInt function, err = NewFunction(mock.NewContext(), ast.Cast, types.NewFieldType(mysql.TypeLonglong), intColumn) c.Assert(err, IsNil) @@ -725,9 +730,15 @@ func (s *testEvaluatorSuite) TestExprPushDownToFlash(c *C) { function, err = NewFunction(mock.NewContext(), ast.JSONDepth, types.NewFieldType(mysql.TypeLonglong), jsonColumn) c.Assert(err, IsNil) exprs = append(exprs, function) + + // ExtractDatetimeFromString: can not be pushed + function, err = NewFunction(mock.NewContext(), ast.Extract, types.NewFieldType(mysql.TypeLonglong), stringColumn, stringColumn) + c.Assert(err, IsNil) + exprs = append(exprs, function) + pushed, remained := PushDownExprs(sc, exprs, client, kv.TiFlash) - c.Assert(len(pushed), Equals, len(exprs)-1) - c.Assert(len(remained), Equals, 1) + c.Assert(len(pushed), Equals, len(exprs)-2) + c.Assert(len(remained), Equals, 2) } func (s *testEvaluatorSuite) TestExprOnlyPushDownToFlash(c *C) { diff --git a/expression/expression.go b/expression/expression.go index 6370926f2dd4d..26bf0fb6e18bf 100644 --- a/expression/expression.go +++ b/expression/expression.go @@ -1070,6 +1070,7 @@ func canFuncBePushed(sf *ScalarFunction, storeType kv.StoreType) bool { ast.TimestampDiff, ast.DateAdd, ast.FromUnixTime, + ast.Extract, // encryption functions. ast.MD5, @@ -1243,7 +1244,7 @@ func CanExprsPushDown(sc *stmtctx.StatementContext, exprs []Expression, client k func scalarExprSupportedByTiKV(function *ScalarFunction) bool { switch function.FuncName.L { case ast.Substr, ast.Substring, ast.DateAdd, ast.TimestampDiff, - ast.FromUnixTime: + ast.FromUnixTime, ast.Extract: return false default: return true @@ -1295,6 +1296,13 @@ func scalarExprSupportedByFlash(function *ScalarFunction) bool { default: return false } + case ast.Extract: + switch function.Function.PbCode() { + case tipb.ScalarFuncSig_ExtractDatetime: + return true + default: + return false + } default: return false } diff --git a/expression/integration_test.go b/expression/integration_test.go index 440067d3ca9e0..f19afccb3d906 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -6301,6 +6301,15 @@ func (s *testIntegrationSerialSuite) TestCollationBasic(c *C) { tk.MustQuery("select a from t_ci where a='A'").Check(testkit.Rows("a")) tk.MustQuery("select a from t_ci where a='a '").Check(testkit.Rows("a")) tk.MustQuery("select a from t_ci where a='a '").Check(testkit.Rows("a")) + + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(c set('A', 'B') collate utf8mb4_general_ci);") + tk.MustExec("insert into t values('a');") + tk.MustExec("insert into t values('B');") + tk.MustQuery("select c from t where c = 'a';").Check(testkit.Rows("A")) + tk.MustQuery("select c from t where c = 'A';").Check(testkit.Rows("A")) + tk.MustQuery("select c from t where c = 'b';").Check(testkit.Rows("B")) + tk.MustQuery("select c from t where c = 'B';").Check(testkit.Rows("B")) } func (s *testIntegrationSerialSuite) TestWeightString(c *C) { diff --git a/go.mod b/go.mod index 5a53c7b55cb36..30bfb7bb5e87c 100644 --- a/go.mod +++ b/go.mod @@ -46,10 +46,10 @@ require ( github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 github.com/pingcap/kvproto v0.0.0-20210223121704-3cd2fc5fad22 github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 - github.com/pingcap/parser v0.0.0-20210303061548-f6776f61e268 + github.com/pingcap/parser v0.0.0-20210310110710-c7333a4927e6 github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99 github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible - github.com/pingcap/tipb v0.0.0-20210308034246-066a76fd4e1b + github.com/pingcap/tipb v0.0.0-20210309080453-72c4feaa6da7 github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 diff --git a/go.sum b/go.sum index 0636203d83bd1..3212d3d49a49b 100644 --- a/go.sum +++ b/go.sum @@ -415,15 +415,15 @@ github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIf github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 h1:M+DNpOu/I3uDmwee6vcnoPd6GgSMqND4gxvDQ/W584U= github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= -github.com/pingcap/parser v0.0.0-20210303061548-f6776f61e268 h1:yWlvSEhQPDVQU6pgFZv5sEWf94t/dUAMuBRFmLgkpek= -github.com/pingcap/parser v0.0.0-20210303061548-f6776f61e268/go.mod h1:GbEr2PgY72/4XqPZzmzstlOU/+il/wrjeTNFs6ihsSE= +github.com/pingcap/parser v0.0.0-20210310110710-c7333a4927e6 h1:V/6ioJmVUN4q6/aUpNdnT6OOPc48R3tnojcVfTrt4QU= +github.com/pingcap/parser v0.0.0-20210310110710-c7333a4927e6/go.mod h1:GbEr2PgY72/4XqPZzmzstlOU/+il/wrjeTNFs6ihsSE= github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99 h1:/ogXgm4guJzow4UafiyXZ6ciAIPzxImaXYiFvTpKzKY= github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible h1:ceznmu/lLseGHP/jKyOa/3u/5H3wtLLLqkH2V3ssSjg= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tipb v0.0.0-20210308034246-066a76fd4e1b h1:AvGm1DqSEwbGgiiu3KVuTtwLl3MqhbwwnJpx82l6/7M= -github.com/pingcap/tipb v0.0.0-20210308034246-066a76fd4e1b/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo= +github.com/pingcap/tipb v0.0.0-20210309080453-72c4feaa6da7 h1:j8MkWmy5tduhHVsdsgZJugN1U9OWTMSBQoZIpn8kqPc= +github.com/pingcap/tipb v0.0.0-20210309080453-72c4feaa6da7/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -844,7 +844,6 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o= honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index d33ef05518726..bae99d548ae5f 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1663,7 +1663,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P return nil, false } joins := make([]PhysicalPlan, 0, 8) - if p.ctx.GetSessionVars().AllowMPPExecution { + if p.ctx.GetSessionVars().AllowMPPExecution && !collate.NewCollationEnabled() { if p.shouldUseMPPBCJ() { mppJoins := p.tryToGetMppHashJoin(prop, true) if (p.preferJoinType & preferBCJoin) > 0 { @@ -1712,6 +1712,29 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P return joins, true } +func canExprsInJoinPushdown(p *LogicalJoin, storeType kv.StoreType) bool { + equalExprs := make([]expression.Expression, 0, len(p.EqualConditions)) + for _, eqCondition := range p.EqualConditions { + if eqCondition.FuncName.L == ast.NullEQ { + return false + } + equalExprs = append(equalExprs, eqCondition) + } + if !expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, equalExprs, p.ctx.GetClient(), storeType) { + return false + } + if !expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.LeftConditions, p.ctx.GetClient(), storeType) { + return false + } + if !expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.RightConditions, p.ctx.GetClient(), storeType) { + return false + } + if !expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.OtherConditions, p.ctx.GetClient(), storeType) { + return false + } + return true +} + func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBCJ bool) []PhysicalPlan { if !prop.IsEmpty() { return nil @@ -1726,10 +1749,10 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC if prop.PartitionTp == property.BroadcastType { return nil } - lkeys, rkeys, _, nullEQ := p.GetJoinKeys() - if nullEQ { + if !canExprsInJoinPushdown(p, kv.TiFlash) { return nil } + lkeys, rkeys, _, _ := p.GetJoinKeys() // check match property baseJoin := basePhysicalJoin{ JoinType: p.JoinType, @@ -1810,8 +1833,7 @@ func (p *LogicalJoin) tryToGetBroadCastJoin(prop *property.PhysicalProperty) []P if prop.TaskTp != property.RootTaskType && !prop.IsFlashProp() { return nil } - _, _, _, hasNullEQ := p.GetJoinKeys() - if hasNullEQ { + if !canExprsInJoinPushdown(p, kv.TiFlash) { return nil } @@ -1828,16 +1850,6 @@ func (p *LogicalJoin) tryToGetBroadCastJoin(prop *property.PhysicalProperty) []P return nil } - if !expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.LeftConditions, p.ctx.GetClient(), kv.TiFlash) { - return nil - } - if !expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.RightConditions, p.ctx.GetClient(), kv.TiFlash) { - return nil - } - if !expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.OtherConditions, p.ctx.GetClient(), kv.TiFlash) { - return nil - } - // for left/semi/anti-semi join the global idx must be 1, and for right join the global idx must be 0 if hasPrefer, idx := p.getPreferredBCJLocalIndex(); hasPrefer { if (idx == 0 && p.JoinType == RightOuterJoin) || (idx == 1 && (p.JoinType == LeftOuterJoin || p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin)) { @@ -2110,7 +2122,7 @@ func (p *baseLogicalPlan) canChildPushDown() bool { return true case *LogicalJoin, *LogicalProjection: // TiFlash supports pushing down more operators - return p.SCtx().GetSessionVars().AllowBCJ || p.SCtx().GetSessionVars().AllowMPPExecution + return p.SCtx().GetSessionVars().AllowBCJ || (p.SCtx().GetSessionVars().AllowMPPExecution && !collate.NewCollationEnabled()) default: return false } @@ -2334,7 +2346,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy if la.ctx.GetSessionVars().AllowBCJ { taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType) } - canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP() + canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && !collate.NewCollationEnabled() && la.checkCanPushDownToMPP() if canPushDownToMPP { taskTypes = append(taskTypes, property.MppTaskType) } diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index c26f6884aff7f..e4e8d55975a6a 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -557,6 +557,116 @@ func (s *testIntegrationSerialSuite) TestBroadcastJoin(c *C) { c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can't find a proper physical plan for this query") } +func (s *testIntegrationSerialSuite) TestJoinNotSupportedByTiFlash(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists table_1") + tk.MustExec("create table table_1(id int not null, bit_col bit(2) not null, datetime_col datetime not null)") + tk.MustExec("insert into table_1 values(1,b'1','2020-01-01 00:00:00'),(2,b'0','2020-01-01 00:00:00')") + tk.MustExec("analyze table table_1") + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "table_1" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'") + tk.MustExec("set @@session.tidb_allow_mpp = 1") + var input []string + var output []struct { + SQL string + Plan []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + } + + tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 1") + tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 1") + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + } + + tk.MustExec("set @@session.tidb_allow_mpp = 0") + tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'") + tk.MustExec("set @@session.tidb_allow_batch_cop = 1") + tk.MustExec("set @@session.tidb_opt_broadcast_join = 1") + // make cbo force choose broadcast join since sql hint does not work for semi/anti-semi join + tk.MustExec("set @@session.tidb_opt_cpu_factor=10000000;") + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + } +} + +func (s *testIntegrationSerialSuite) TestMPPNotSupportedInNewCollation(c *C) { + defer collate.SetNewCollationEnabledForTest(false) + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists table_1") + tk.MustExec("create table table_1(id int not null, value int)") + tk.MustExec("insert into table_1 values(1,1),(2,2)") + tk.MustExec("analyze table table_1") + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "table_1" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + collate.SetNewCollationEnabledForTest(true) + tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'") + tk.MustExec("set @@session.tidb_allow_mpp = 1") + var input []string + var output []struct { + SQL string + Plan []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + } +} + func (s *testIntegrationSerialSuite) TestAggPushDownEngine(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index f199c3dd9cf19..185e750c0974c 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -16,6 +16,7 @@ package core import ( "github.com/pingcap/errors" "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" @@ -197,9 +198,6 @@ func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context, storeType kv.StoreType) executorID = p.ExplainID().String() } err := SetPBColumnsDefaultValue(ctx, tsExec.Columns, p.Columns) - if p.Table.IsCommonHandle { - tsExec.PrimaryPrefixColumnIds = tables.PrimaryPrefixColumnIDs(p.Table) - } return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tsExec, ExecutorId: &executorID}, err } @@ -280,7 +278,11 @@ func (e *PhysicalExchangeReceiver) ToPB(ctx sessionctx.Context, storeType kv.Sto fieldTypes := make([]*tipb.FieldType, 0, len(e.Schema().Columns)) for _, column := range e.Schema().Columns { - fieldTypes = append(fieldTypes, expression.ToPBFieldType(column.RetType)) + pbType := expression.ToPBFieldType(column.RetType) + if column.RetType.Tp == mysql.TypeEnum { + pbType.Elems = append(pbType.Elems, column.RetType.Elems...) + } + fieldTypes = append(fieldTypes, pbType) } ecExec := &tipb.ExchangeReceiver{ EncodedTaskMeta: encodedTask, diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index 1e9c9ae48c081..0ec72814b934e 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -1100,7 +1100,8 @@ func getNameValuePairs(stmtCtx *stmtctx.StatementContext, tbl *model.TableInfo, } } // The converted result must be same as original datum. - cmp, err := d.CompareDatum(stmtCtx, &dVal) + // Compare them based on the dVal's type. + cmp, err := dVal.CompareDatum(stmtCtx, &d) if err != nil { return nil, false } else if cmp != 0 { diff --git a/planner/core/testdata/integration_serial_suite_in.json b/planner/core/testdata/integration_serial_suite_in.json index 66c6f69215358..3cf074702280d 100644 --- a/planner/core/testdata/integration_serial_suite_in.json +++ b/planner/core/testdata/integration_serial_suite_in.json @@ -68,6 +68,22 @@ "explain format = 'brief' select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)" ] }, + { + "name": "TestJoinNotSupportedByTiFlash", + "cases": [ + "explain format = 'brief' select * from table_1 a, table_1 b where a.bit_col = b.bit_col", + "explain format = 'brief' select * from table_1 a left join table_1 b on a.id = b.id and dayofmonth(a.datetime_col) > 100", + "explain format = 'brief' select * from table_1 a right join table_1 b on a.id = b.id and dayofmonth(b.datetime_col) > 100", + "explain format = 'brief' select * from table_1 a join table_1 b on a.id = b.id and dayofmonth(a.datetime_col) > dayofmonth(b.datetime_col)" + ] + }, + { + "name": "TestMPPNotSupportedInNewCollation", + "cases": [ + "explain format = 'brief' select * from table_1 a, table_1 b where a.id = b.id", + "explain format = 'brief' select /*+ agg_to_cop() */ count(*), id from table_1 group by id" + ] + }, { "name": "TestReadFromStorageHint", "cases": [ diff --git a/planner/core/testdata/integration_serial_suite_out.json b/planner/core/testdata/integration_serial_suite_out.json index 1acf566e898b4..63fc11c2a2a59 100644 --- a/planner/core/testdata/integration_serial_suite_out.json +++ b/planner/core/testdata/integration_serial_suite_out.json @@ -777,6 +777,75 @@ } ] }, + { + "Name": "TestJoinNotSupportedByTiFlash", + "Cases": [ + { + "SQL": "explain format = 'brief' select * from table_1 a, table_1 b where a.bit_col = b.bit_col", + "Plan": [ + "HashJoin 2.00 root inner join, equal:[eq(test.table_1.bit_col, test.table_1.bit_col)]", + "├─TableReader(Build) 2.00 root data:TableFullScan", + "│ └─TableFullScan 2.00 cop[tiflash] table:b keep order:false", + "└─TableReader(Probe) 2.00 root data:TableFullScan", + " └─TableFullScan 2.00 cop[tiflash] table:a keep order:false" + ] + }, + { + "SQL": "explain format = 'brief' select * from table_1 a left join table_1 b on a.id = b.id and dayofmonth(a.datetime_col) > 100", + "Plan": [ + "HashJoin 2.00 root left outer join, equal:[eq(test.table_1.id, test.table_1.id)], left cond:[gt(dayofmonth(test.table_1.datetime_col), 100)]", + "├─TableReader(Build) 2.00 root data:TableFullScan", + "│ └─TableFullScan 2.00 cop[tiflash] table:b keep order:false", + "└─TableReader(Probe) 2.00 root data:TableFullScan", + " └─TableFullScan 2.00 cop[tiflash] table:a keep order:false" + ] + }, + { + "SQL": "explain format = 'brief' select * from table_1 a right join table_1 b on a.id = b.id and dayofmonth(b.datetime_col) > 100", + "Plan": [ + "HashJoin 2.00 root right outer join, equal:[eq(test.table_1.id, test.table_1.id)], right cond:gt(dayofmonth(test.table_1.datetime_col), 100)", + "├─TableReader(Build) 2.00 root data:TableFullScan", + "│ └─TableFullScan 2.00 cop[tiflash] table:a keep order:false", + "└─TableReader(Probe) 2.00 root data:TableFullScan", + " └─TableFullScan 2.00 cop[tiflash] table:b keep order:false" + ] + }, + { + "SQL": "explain format = 'brief' select * from table_1 a join table_1 b on a.id = b.id and dayofmonth(a.datetime_col) > dayofmonth(b.datetime_col)", + "Plan": [ + "HashJoin 2.00 root inner join, equal:[eq(test.table_1.id, test.table_1.id)], other cond:gt(dayofmonth(test.table_1.datetime_col), dayofmonth(test.table_1.datetime_col))", + "├─TableReader(Build) 2.00 root data:TableFullScan", + "│ └─TableFullScan 2.00 cop[tiflash] table:b keep order:false", + "└─TableReader(Probe) 2.00 root data:TableFullScan", + " └─TableFullScan 2.00 cop[tiflash] table:a keep order:false" + ] + } + ] + }, + { + "Name": "TestMPPNotSupportedInNewCollation", + "Cases": [ + { + "SQL": "explain format = 'brief' select * from table_1 a, table_1 b where a.id = b.id", + "Plan": [ + "HashJoin 2.00 root inner join, equal:[eq(test.table_1.id, test.table_1.id)]", + "├─TableReader(Build) 2.00 root data:TableFullScan", + "│ └─TableFullScan 2.00 cop[tiflash] table:b keep order:false", + "└─TableReader(Probe) 2.00 root data:TableFullScan", + " └─TableFullScan 2.00 cop[tiflash] table:a keep order:false" + ] + }, + { + "SQL": "explain format = 'brief' select /*+ agg_to_cop() */ count(*), id from table_1 group by id", + "Plan": [ + "HashAgg 2.00 root group by:test.table_1.id, funcs:count(Column#5)->Column#4, funcs:firstrow(test.table_1.id)->test.table_1.id", + "└─TableReader 2.00 root data:HashAgg", + " └─HashAgg 2.00 batchCop[tiflash] group by:test.table_1.id, funcs:count(1)->Column#5", + " └─TableFullScan 2.00 batchCop[tiflash] table:table_1 keep order:false" + ] + } + ] + }, { "Name": "TestReadFromStorageHint", "Cases": [ diff --git a/server/conn.go b/server/conn.go index 04e099b875c96..b95f77cc06503 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1495,7 +1495,8 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) { } retryable, err = cc.handleStmt(ctx, stmt, parserWarns, i == len(stmts)-1) if err != nil { - if cc.ctx.GetSessionVars().EnableTiFlashFallbackTiKV && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable { + _, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash] + if allowTiFlashFallback && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable { // When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash // server and fallback to TiKV. warns := append(parserWarns, stmtctx.SQLWarn{Level: stmtctx.WarnLevelError, Err: err}) @@ -1614,7 +1615,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm return pointPlans, nil } -// The first return value indicates whether the call of handleStmt has no side effect and can be retried to correct error. +// The first return value indicates whether the call of handleStmt has no side effect and can be retried. // Currently the first return value is used to fallback to TiKV when TiFlash is down. func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns []stmtctx.SQLWarn, lastStmt bool) (bool, error) { ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{}) @@ -1792,9 +1793,14 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool } for { - failpoint.Inject("secondNextErr", func(value failpoint.Value) { - if value.(bool) && !firstNext { + failpoint.Inject("fetchNextErr", func(value failpoint.Value) { + switch value.(string) { + case "firstNext": failpoint.Return(firstNext, tikv.ErrTiFlashServerTimeout) + case "secondNext": + if !firstNext { + failpoint.Return(firstNext, tikv.ErrTiFlashServerTimeout) + } } }) // Here server.tidbResultSet implements Next method. diff --git a/server/conn_stmt.go b/server/conn_stmt.go index 762a5be52cf39..de2d3a074e1cd 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -195,7 +195,8 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e } ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{}) retryable, err := cc.executePreparedStmtAndWriteResult(ctx, stmt, args, useCursor) - if cc.ctx.GetSessionVars().EnableTiFlashFallbackTiKV && err != nil && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable { + _, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash] + if allowTiFlashFallback && err != nil && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable { // When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash // server and fallback to TiKV. prevErr := err @@ -210,8 +211,8 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e return err } -// The first return value indicates whether the call of executePreparedStmtAndWriteResult has no side effect and can be retried -// to correct error. Currently the first return value is used to fallback to TiKV when TiFlash is down. +// The first return value indicates whether the call of executePreparedStmtAndWriteResult has no side effect and can be retried. +// Currently the first return value is used to fallback to TiKV when TiFlash is down. func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stmt PreparedStatement, args []types.Datum, useCursor bool) (bool, error) { rs, err := stmt.Execute(ctx, args) if err != nil { diff --git a/server/conn_test.go b/server/conn_test.go index 2aab4e0d3733e..6add06c0369a8 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -761,22 +761,28 @@ func (ts *ConnTestSuite) TestTiFlashFallback(c *C) { c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"), IsNil) // test COM_STMT_EXECUTE ctx := context.Background() - tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1") + tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'") c.Assert(cc.handleStmtPrepare(ctx, "select sum(a) from t"), IsNil) c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), IsNil) tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout")) // test COM_STMT_FETCH (cursor mode) c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}), IsNil) c.Assert(cc.handleStmtFetch(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), NotNil) - tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=0") + tk.MustExec("set @@tidb_allow_fallback_to_tikv=''") c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), NotNil) c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0"), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/fetchNextErr", "return(\"firstNext\")"), IsNil) + // test COM_STMT_EXECUTE (cursor mode) + tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'") + c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/fetchNextErr"), IsNil) + // test that TiDB would not retry if the first execution already sends data to client - c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/secondNextErr", "return(true)"), IsNil) - tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1") + c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/fetchNextErr", "return(\"secondNext\")"), IsNil) + tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'") c.Assert(cc.handleQuery(ctx, "select * from t t1 join t t2 on t1.a = t2.a"), NotNil) - c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/secondNextErr"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/fetchNextErr"), IsNil) // simple TiFlash query (unary + non-streaming) tk.MustExec("set @@tidb_allow_batch_cop=0; set @@tidb_allow_mpp=0;") @@ -809,9 +815,9 @@ func (ts *ConnTestSuite) TestTiFlashFallback(c *C) { func testFallbackWork(c *C, tk *testkit.TestKit, cc *clientConn, sql string) { ctx := context.Background() - tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=0") + tk.MustExec("set @@tidb_allow_fallback_to_tikv=''") c.Assert(tk.QueryToErr(sql), NotNil) - tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1") + tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'") c.Assert(cc.handleQuery(ctx, sql), IsNil) tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout")) diff --git a/session/bootstrap.go b/session/bootstrap.go index ab43ce0a0e98b..b8c8d20b77489 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -464,8 +464,7 @@ const ( version59 = 59 // version60 redesigns `mysql.stats_extended` version60 = 60 - // version61 restore all SQL bindings. - version61 = 61 + // version61 will be redone in version67 // version62 add column ndv for mysql.stats_buckets. version62 = 62 // version63 fixes the bug that upgradeToVer51 would be missed when upgrading from v4.0 to a new version @@ -474,9 +473,13 @@ const ( version64 = 64 // version65 add mysql.stats_fm_sketch table. version65 = 65 + // version66 enables the feature `track_aggregate_memory_usage` by default. + version66 = 66 + // version67 restore all SQL bindings. + version67 = 67 // please make sure this is the largest version - currentBootstrapVersion = version65 + currentBootstrapVersion = version67 ) var ( @@ -541,11 +544,13 @@ var ( // We will redo upgradeToVer58 in upgradeToVer64, it is skipped here. upgradeToVer59, upgradeToVer60, - upgradeToVer61, + // We will redo upgradeToVer61 in upgradeToVer67, it is skipped here. upgradeToVer62, upgradeToVer63, upgradeToVer64, upgradeToVer65, + upgradeToVer66, + upgradeToVer67, } ) @@ -1319,8 +1324,8 @@ type bindInfo struct { source string } -func upgradeToVer61(s Session, ver int64) { - if ver >= version61 { +func upgradeToVer67(s Session, ver int64) { + if ver >= version67 { return } bindMap := make(map[string]bindInfo) @@ -1344,7 +1349,7 @@ func upgradeToVer61(s Session, ver int64) { WHERE source != 'builtin' ORDER BY update_time DESC`) if err != nil { - logutil.BgLogger().Fatal("upgradeToVer61 error", zap.Error(err)) + logutil.BgLogger().Fatal("upgradeToVer67 error", zap.Error(err)) } if rs != nil { defer terror.Call(rs.Close) @@ -1356,7 +1361,7 @@ func upgradeToVer61(s Session, ver int64) { for { err = rs.Next(context.TODO(), req) if err != nil { - logutil.BgLogger().Fatal("upgradeToVer61 error", zap.Error(err)) + logutil.BgLogger().Fatal("upgradeToVer67 error", zap.Error(err)) } if req.NumRows() == 0 { break @@ -1448,6 +1453,13 @@ func upgradeToVer65(s Session, ver int64) { doReentrantDDL(s, CreateStatsFMSketchTable) } +func upgradeToVer66(s Session, ver int64) { + if ver >= version66 { + return + } + mustExecute(s, "set @@global.tidb_track_aggregate_memory_usage = 1") +} + func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, @@ -1562,6 +1574,12 @@ func doDMLWorks(s Session) { vVal = variable.BoolOn } } + if v.Name == variable.TiDBEnableAsyncCommit && config.GetGlobalConfig().Store == "tikv" { + vVal = variable.BoolOn + } + if v.Name == variable.TiDBEnable1PC && config.GetGlobalConfig().Store == "tikv" { + vVal = variable.BoolOn + } value := fmt.Sprintf(`("%s", "%s")`, strings.ToLower(k), vVal) values = append(values, value) } diff --git a/session/bootstrap_test.go b/session/bootstrap_test.go index 526133f6ea27c..60d2f537bf8eb 100644 --- a/session/bootstrap_test.go +++ b/session/bootstrap_test.go @@ -521,7 +521,7 @@ func (s *testBootstrapSuite) TestUpdateBindInfo(c *C) { bindText: "select /*+ use_index(t, idxb) */ * from t where a > 1", db: "test", originWithDB: "select * from `test` . `t` where `a` > ?", - bindWithDB: "SELECT /*+ use_index(t idxb)*/ * FROM test.t WHERE a > 1", + bindWithDB: "SELECT /*+ use_index(`t` `idxb`)*/ * FROM `test`.`t` WHERE `a` > 1", deleteText: "select * from test.t where a > 1", }, { @@ -529,9 +529,17 @@ func (s *testBootstrapSuite) TestUpdateBindInfo(c *C) { bindText: "select /*+ use_index(t, idx) */ count(1), max(a) from t group by b", db: "test", originWithDB: "select count ( ? ) , max ( `a` ) from `test` . `t` group by `b`", - bindWithDB: "SELECT /*+ use_index(t idx)*/ count(1),max(a) FROM test.t GROUP BY b", + bindWithDB: "SELECT /*+ use_index(`t` `idx`)*/ count(1),max(`a`) FROM `test`.`t` GROUP BY `b`", deleteText: "select count(1), max(a) from test.t group by b", }, + { + originText: "select * from `test` . `t` where `a` = (_charset) ?", + bindText: "SELECT * FROM test.t WHERE a = _utf8\\'ab\\'", + db: "test", + originWithDB: "select * from `test` . `t` where `a` = ?", + bindWithDB: "SELECT * FROM `test`.`t` WHERE `a` = 'ab'", + deleteText: "select * from test.t where a = 'c'", + }, } defer testleak.AfterTest(c)() ctx := context.Background() @@ -547,7 +555,7 @@ func (s *testBootstrapSuite) TestUpdateBindInfo(c *C) { ) mustExecSQL(c, se, sql) - upgradeToVer61(se, version60) + upgradeToVer67(se, version66) r := mustExecSQL(c, se, `select original_sql, bind_sql, default_db, status from mysql.bind_info where source != 'builtin'`) req := r.NewChunk() c.Assert(r.Next(ctx, req), IsNil) @@ -582,7 +590,7 @@ func (s *testBootstrapSuite) TestUpdateDuplicateBindInfo(c *C) { // The latest one. mustExecSQL(c, se, `insert into mysql.bind_info values('select * from test . t', 'select /*+ use_index(t, idx_b)*/ * from test.t', 'test', 'using', '2021-01-04 14:50:58.257', '2021-01-09 14:50:58.257', 'utf8', 'utf8_general_ci', 'manual')`) - upgradeToVer61(se, version60) + upgradeToVer67(se, version66) r := mustExecSQL(c, se, `select original_sql, bind_sql, default_db, status, create_time from mysql.bind_info where source != 'builtin'`) req := r.NewChunk() @@ -590,10 +598,51 @@ func (s *testBootstrapSuite) TestUpdateDuplicateBindInfo(c *C) { c.Assert(req.NumRows(), Equals, 1) row := req.GetRow(0) c.Assert(row.GetString(0), Equals, "select * from `test` . `t`") - c.Assert(row.GetString(1), Equals, "SELECT /*+ use_index(t idx_b)*/ * FROM test.t") + c.Assert(row.GetString(1), Equals, "SELECT /*+ use_index(`t` `idx_b`)*/ * FROM `test`.`t`") c.Assert(row.GetString(2), Equals, "") c.Assert(row.GetString(3), Equals, "using") c.Assert(row.GetTime(4).String(), Equals, "2021-01-04 14:50:58.257") c.Assert(r.Close(), IsNil) mustExecSQL(c, se, "delete from mysql.bind_info where original_sql = 'select * from test . t'") } + +func (s *testBootstrapSuite) TestUpgradeVersion66(c *C) { + var err error + defer testleak.AfterTest(c)() + ctx := context.Background() + store, _ := newStoreWithBootstrap(c, s.dbName) + defer func() { + c.Assert(store.Close(), IsNil) + }() + + seV65 := newSession(c, store, s.dbName) + txn, err := store.Begin() + c.Assert(err, IsNil) + m := meta.NewMeta(txn) + err = m.FinishBootstrap(int64(65)) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + mustExecSQL(c, seV65, "update mysql.tidb set variable_value='65' where variable_name='tidb_server_version'") + mustExecSQL(c, seV65, "set @@global.tidb_track_aggregate_memory_usage = 0") + mustExecSQL(c, seV65, "commit") + unsetStoreBootstrapped(store.UUID()) + ver, err := getBootstrapVersion(seV65) + c.Assert(err, IsNil) + c.Assert(ver, Equals, int64(65)) + + domV66, err := BootstrapSession(store) + c.Assert(err, IsNil) + defer domV66.Close() + seV66 := newSession(c, store, s.dbName) + ver, err = getBootstrapVersion(seV66) + c.Assert(err, IsNil) + c.Assert(ver, Equals, int64(currentBootstrapVersion)) + r := mustExecSQL(c, seV66, `select @@global.tidb_track_aggregate_memory_usage, @@session.tidb_track_aggregate_memory_usage`) + req := r.NewChunk() + c.Assert(r.Next(ctx, req), IsNil) + c.Assert(req.NumRows(), Equals, 1) + row := req.GetRow(0) + c.Assert(row.GetInt64(0), Equals, int64(1)) + c.Assert(row.GetInt64(1), Equals, int64(1)) +} diff --git a/session/clustered_index_test.go b/session/clustered_index_test.go index d189f12a2ccbd..9589d58426c0f 100644 --- a/session/clustered_index_test.go +++ b/session/clustered_index_test.go @@ -437,6 +437,24 @@ func (s *testClusteredSerialSuite) TestClusteredIndexSyntax(c *C) { } } +func (s *testClusteredSerialSuite) TestPrefixClusteredIndexAddIndexAndRecover(c *C) { + tk1 := testkit.NewTestKit(c, s.store) + tk1.MustExec("use test;") + tk1.MustExec("drop table if exists t;") + defer func() { + tk1.MustExec("drop table if exists t;") + }() + + tk1.MustExec("create table t(a char(3), b char(3), primary key(a(1)) clustered)") + tk1.MustExec("insert into t values ('aaa', 'bbb')") + tk1.MustExec("alter table t add index idx(b)") + tk1.MustQuery("select * from t use index(idx)").Check(testkit.Rows("aaa bbb")) + tk1.MustExec("admin check table t") + tk1.MustExec("admin recover index t idx") + tk1.MustQuery("select * from t use index(idx)").Check(testkit.Rows("aaa bbb")) + tk1.MustExec("admin check table t") +} + // https://github.com/pingcap/tidb/issues/23106 func (s *testClusteredSerialSuite) TestClusteredIndexDecodeRestoredDataV5(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) diff --git a/session/session.go b/session/session.go index 365d06ed47dc2..becca9333974d 100644 --- a/session/session.go +++ b/session/session.go @@ -2496,7 +2496,7 @@ var builtinGlobalVariable = []string{ variable.TiDBTrackAggregateMemoryUsage, variable.TiDBMultiStatementMode, variable.TiDBEnableExchangePartition, - variable.TiDBEnableTiFlashFallbackTiKV, + variable.TiDBAllowFallbackToTiKV, } // loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 9b96ce9979c14..1f939a607ac63 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -815,8 +815,9 @@ type SessionVars struct { // TiDBEnableExchangePartition indicates whether to enable exchange partition TiDBEnableExchangePartition bool - // EnableTiFlashFallbackTiKV indicates whether to fallback to TiKV when TiFlash is unavailable. - EnableTiFlashFallbackTiKV bool + // AllowFallbackToTiKV indicates the engine types whose unavailability triggers fallback to TiKV. + // Now we only support TiFlash. + AllowFallbackToTiKV map[kv.StoreType]struct{} } // CheckAndGetTxnScope will return the transaction scope we should use in the current session. @@ -997,7 +998,7 @@ func NewSessionVars() *SessionVars { GuaranteeLinearizability: DefTiDBGuaranteeLinearizability, AnalyzeVersion: DefTiDBAnalyzeVersion, EnableIndexMergeJoin: DefTiDBEnableIndexMergeJoin, - EnableTiFlashFallbackTiKV: DefTiDBEnableTiFlashFallbackTiKV, + AllowFallbackToTiKV: make(map[kv.StoreType]struct{}), } vars.KVVars = kv.NewVariables(&vars.Killed) vars.Concurrency = Concurrency{ @@ -1749,8 +1750,14 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.MultiStatementMode = TiDBOptMultiStmt(val) case TiDBEnableExchangePartition: s.TiDBEnableExchangePartition = TiDBOptOn(val) - case TiDBEnableTiFlashFallbackTiKV: - s.EnableTiFlashFallbackTiKV = TiDBOptOn(val) + case TiDBAllowFallbackToTiKV: + s.AllowFallbackToTiKV = make(map[kv.StoreType]struct{}) + for _, engine := range strings.Split(val, ",") { + switch engine { + case kv.TiFlash.Name(): + s.AllowFallbackToTiKV[kv.TiFlash] = struct{}{} + } + } } s.systems[name] = val return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index efced6ec1bd62..c8aae167e6b34 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -670,7 +670,30 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableFastAnalyze, Value: BoolToOnOff(DefTiDBUseFastAnalyze), Type: TypeBool}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBSkipIsolationLevelCheck, Value: BoolToOnOff(DefTiDBSkipIsolationLevelCheck), Type: TypeBool}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableRateLimitAction, Value: BoolToOnOff(DefTiDBEnableRateLimitAction), Type: TypeBool}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableTiFlashFallbackTiKV, Value: BoolToOnOff(DefTiDBEnableTiFlashFallbackTiKV), Type: TypeBool}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowFallbackToTiKV, Value: "", Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { + if normalizedValue == "" { + return "", nil + } + engines := strings.Split(normalizedValue, ",") + var formatVal string + storeTypes := make(map[kv.StoreType]struct{}) + for i, engine := range engines { + engine = strings.TrimSpace(engine) + switch { + case strings.EqualFold(engine, kv.TiFlash.Name()): + if _, ok := storeTypes[kv.TiFlash]; !ok { + if i != 0 { + formatVal += "," + } + formatVal += kv.TiFlash.Name() + storeTypes[kv.TiFlash] = struct{}{} + } + default: + return normalizedValue, ErrWrongValueForVar.GenWithStackByArgs(TiDBAllowFallbackToTiKV, normalizedValue) + } + } + return formatVal, nil + }}, /* The following variable is defined as session scope but is actually server scope. */ {Scope: ScopeSession, Name: TiDBGeneralLog, Value: BoolToOnOff(DefTiDBGeneralLog), Type: TypeBool}, {Scope: ScopeSession, Name: TiDBPProfSQLCPU, Value: strconv.Itoa(DefTiDBPProfSQLCPU), Type: TypeInt, MinValue: 0, MaxValue: 1}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 466b97cfad5ba..3b8851c1e7ff4 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -526,8 +526,9 @@ const ( // TiDBEnableExchangePartition indicates whether to enable exchange partition. TiDBEnableExchangePartition = "tidb_enable_exchange_partition" - // TiDBEnableTiFlashFallbackTiKV indicates whether to fallback to TiKV when TiFlash is unavailable. - TiDBEnableTiFlashFallbackTiKV = "tidb_enable_tiflash_fallback_tikv" + // TiDBAllowFallbackToTiKV indicates the engine types whose unavailability triggers fallback to TiKV. + // Now we only support TiFlash. + TiDBAllowFallbackToTiKV = "tidb_allow_fallback_to_tikv" ) // TiDB vars that have only global scope @@ -666,9 +667,8 @@ const ( DefTiDBGuaranteeLinearizability = true DefTiDBAnalyzeVersion = 1 DefTiDBEnableIndexMergeJoin = false - DefTiDBTrackAggregateMemoryUsage = false + DefTiDBTrackAggregateMemoryUsage = true DefTiDBEnableExchangePartition = false - DefTiDBEnableTiFlashFallbackTiKV = false ) // Process global variables. @@ -706,6 +706,7 @@ var FeatureSwitchVariables = []string{ TiDBEnable1PC, TiDBGuaranteeLinearizability, TiDBEnableClusteredIndex, + TiDBTrackAggregateMemoryUsage, } // FilterImplicitFeatureSwitch is used to filter result of show variables, these switches should be turn blind to users. diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index f389adff3a8a4..b5ad1c5b20486 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -105,7 +105,6 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) { c.Assert(vars.ShardAllocateStep, Equals, int64(DefTiDBShardAllocateStep)) c.Assert(vars.EnableChangeColumnType, Equals, DefTiDBChangeColumnType) c.Assert(vars.AnalyzeVersion, Equals, DefTiDBAnalyzeVersion) - c.Assert(vars.EnableTiFlashFallbackTiKV, Equals, DefTiDBEnableTiFlashFallbackTiKV) assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.MemQuota)) assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.BatchSize)) @@ -597,6 +596,12 @@ func (s *testVarsutilSuite) TestValidate(c *C) { {TiDBEnableAmendPessimisticTxn, "0", false}, {TiDBEnableAmendPessimisticTxn, "1", false}, {TiDBEnableAmendPessimisticTxn, "256", true}, + {TiDBAllowFallbackToTiKV, "", false}, + {TiDBAllowFallbackToTiKV, "tiflash", false}, + {TiDBAllowFallbackToTiKV, " tiflash ", false}, + {TiDBAllowFallbackToTiKV, "tikv", true}, + {TiDBAllowFallbackToTiKV, "tidb", true}, + {TiDBAllowFallbackToTiKV, "tiflash,tikv,tidb", true}, } for _, t := range tests { diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index d7fe204f7afe1..6096ec5403121 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -307,3 +307,15 @@ func (s *tikvStore) BeginWithOption(option kv.TransactionOption) (kv.Transaction } return txn_driver.NewTiKVTxn(txn), err } + +// GetSnapshot gets a snapshot that is able to read any data which data is <= ver. +// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot. +func (s *tikvStore) GetSnapshot(ver kv.Version) kv.Snapshot { + return s.KVStore.GetSnapshot(ver.Ver) +} + +// CurrentVersion returns current max committed version with the given txnScope (local or global). +func (s *tikvStore) CurrentVersion(txnScope string) (kv.Version, error) { + ver, err := s.KVStore.CurrentTimestamp(txnScope) + return kv.NewVersion(ver), err +} diff --git a/store/helper/helper.go b/store/helper/helper.go index 3646d92cafe62..71ddb5c942a9e 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -55,6 +55,7 @@ type Storage interface { Close() error UUID() string CurrentVersion(txnScope string) (kv.Version, error) + CurrentTimestamp(txnScop string) (uint64, error) GetOracle() oracle.Oracle SupportDeleteRange() (supported bool) Name() string diff --git a/store/mockstore/unistore.go b/store/mockstore/unistore.go index 28a913f5ddbbf..6842fb59f8d9f 100644 --- a/store/mockstore/unistore.go +++ b/store/mockstore/unistore.go @@ -101,6 +101,18 @@ func (s *mockStorage) BeginWithOption(option kv.TransactionOption) (kv.Transacti return newTiKVTxn(txn, err) } +// GetSnapshot gets a snapshot that is able to read any data which data is <= ver. +// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot. +func (s *mockStorage) GetSnapshot(ver kv.Version) kv.Snapshot { + return s.KVStore.GetSnapshot(ver.Ver) +} + +// CurrentVersion returns current max committed version with the given txnScope (local or global). +func (s *mockStorage) CurrentVersion(txnScope string) (kv.Version, error) { + ver, err := s.KVStore.CurrentTimestamp(txnScope) + return kv.NewVersion(ver), err +} + func newTiKVTxn(txn *tikv.KVTxn, err error) (kv.Transaction, error) { if err != nil { return nil, err diff --git a/store/mockstore/unistore/cophandler/mpp.go b/store/mockstore/unistore/cophandler/mpp.go index c77caa03f8f67..9849f53921099 100644 --- a/store/mockstore/unistore/cophandler/mpp.go +++ b/store/mockstore/unistore/cophandler/mpp.go @@ -132,7 +132,11 @@ func (b *mppExecBuilder) buildMPPExchangeReceiver(pb *tipb.ExchangeReceiver) (*e } for _, pbType := range pb.FieldTypes { - e.fieldTypes = append(e.fieldTypes, expression.FieldTypeFromPB(pbType)) + tp := expression.FieldTypeFromPB(pbType) + if tp.Tp == mysql.TypeEnum { + tp.Elems = append(tp.Elems, pbType.Elems...) + } + e.fieldTypes = append(e.fieldTypes, tp) } return e, nil } diff --git a/store/tikv/1pc_test.go b/store/tikv/1pc_test.go index b12d22a418792..f3dd54593257f 100644 --- a/store/tikv/1pc_test.go +++ b/store/tikv/1pc_test.go @@ -130,7 +130,7 @@ func (s *testOnePCSuite) Test1PC(c *C) { // Check all keys keys := [][]byte{k1, k2, k3, k4, k5, k6} values := [][]byte{v1, v2, v3, v4, v5, v6New} - ver, err := s.store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) c.Assert(err, IsNil) snap := s.store.GetSnapshot(ver) for i, k := range keys { @@ -214,7 +214,7 @@ func (s *testOnePCSuite) Test1PCDisallowMultiRegion(c *C) { c.Assert(txn.committer.onePCCommitTS, Equals, uint64(0)) c.Assert(txn.committer.commitTS, Greater, txn.startTS) - ver, err := s.store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) c.Assert(err, IsNil) snap := s.store.GetSnapshot(ver) for i, k := range keys { diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 08a3b9992930c..5e0e4cbdb91c8 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -1277,12 +1277,12 @@ func (c *twoPhaseCommitter) amendPessimisticLock(ctx context.Context, addMutatio if err != nil { // KeysNeedToLock won't change, so don't async rollback pessimistic locks here for write conflict. if terror.ErrorEqual(kv.ErrWriteConflict, err) { - newForUpdateTSVer, err := c.store.CurrentVersion(oracle.GlobalTxnScope) + newForUpdateTSVer, err := c.store.CurrentTimestamp(oracle.GlobalTxnScope) if err != nil { return errors.Trace(err) } - lCtx.ForUpdateTS = newForUpdateTSVer.Ver - c.forUpdateTS = newForUpdateTSVer.Ver + lCtx.ForUpdateTS = newForUpdateTSVer + c.forUpdateTS = newForUpdateTSVer logutil.Logger(ctx).Info("amend pessimistic lock pessimistic retry lock", zap.Uint("tryTimes", tryTimes), zap.Uint64("startTS", c.startTS), zap.Uint64("newForUpdateTS", c.forUpdateTS)) diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index a31d88355a3d8..4d33968e75cba 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -333,12 +333,12 @@ func (s *testCommitterSuite) mustGetRegionID(c *C, key []byte) uint64 { } func (s *testCommitterSuite) isKeyLocked(c *C, key []byte) bool { - ver, err := s.store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) c.Assert(err, IsNil) bo := NewBackofferWithVars(context.Background(), getMaxBackoff, nil) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ Key: key, - Version: ver.Ver, + Version: ver, }) loc, err := s.store.regionCache.LocateKey(bo, key) c.Assert(err, IsNil) diff --git a/store/tikv/async_commit_test.go b/store/tikv/async_commit_test.go index e63e356f6ea9f..343c37ee3e345 100644 --- a/store/tikv/async_commit_test.go +++ b/store/tikv/async_commit_test.go @@ -76,11 +76,11 @@ func (s *testAsyncCommitCommon) mustGetFromTxn(c *C, txn *KVTxn, key, expectedVa } func (s *testAsyncCommitCommon) mustGetLock(c *C, key []byte) *Lock { - ver, err := s.store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) c.Assert(err, IsNil) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ Key: key, - Version: ver.Ver, + Version: ver, }) bo := NewBackofferWithVars(context.Background(), 5000, nil) loc, err := s.store.regionCache.LocateKey(bo, key) @@ -96,21 +96,21 @@ func (s *testAsyncCommitCommon) mustGetLock(c *C, key []byte) *Lock { } func (s *testAsyncCommitCommon) mustPointGet(c *C, key, expectedValue []byte) { - snap := s.store.GetSnapshot(kv.MaxVersion) + snap := s.store.GetSnapshot(maxTimestamp) value, err := snap.Get(context.Background(), key) c.Assert(err, IsNil) c.Assert(value, BytesEquals, expectedValue) } func (s *testAsyncCommitCommon) mustGetFromSnapshot(c *C, version uint64, key, expectedValue []byte) { - snap := s.store.GetSnapshot(kv.Version{Ver: version}) + snap := s.store.GetSnapshot(version) value, err := snap.Get(context.Background(), key) c.Assert(err, IsNil) c.Assert(value, BytesEquals, expectedValue) } func (s *testAsyncCommitCommon) mustGetNoneFromSnapshot(c *C, version uint64, key []byte) { - snap := s.store.GetSnapshot(kv.Version{Ver: version}) + snap := s.store.GetSnapshot(version) _, err := snap.Get(context.Background(), key) c.Assert(errors.Cause(err), Equals, kv.ErrNotExist) } diff --git a/store/tikv/config/client.go b/store/tikv/config/client.go index d2993a1662a6d..97dc281722093 100644 --- a/store/tikv/config/client.go +++ b/store/tikv/config/client.go @@ -70,15 +70,12 @@ type AsyncCommit struct { KeysLimit uint `toml:"keys-limit" json:"keys-limit"` // Use async commit only if the total size of keys does not exceed TotalKeySizeLimit. TotalKeySizeLimit uint64 `toml:"total-key-size-limit" json:"total-key-size-limit"` - // The following two fields should never be modified by the user, so tags are not provided - // on purpose. // The duration within which is safe for async commit or 1PC to commit with an old schema. - // It is only changed in tests. - // TODO: 1PC is not part of async commit. These two fields should be moved to a more suitable - // place. - SafeWindow time.Duration + // The following two fields should NOT be modified in most cases. If both async commit + // and 1PC are disabled in the whole cluster, they can be set to zero to avoid waiting in DDLs. + SafeWindow time.Duration `toml:"safe-window" json:"safe-window"` // The duration in addition to SafeWindow to make DDL safe. - AllowedClockDrift time.Duration + AllowedClockDrift time.Duration `toml:"allowed-clock-drift" json:"allowed-clock-drift"` } // CoprocessorCache is the config for coprocessor cache. diff --git a/store/tikv/interface.go b/store/tikv/interface.go index cd6f4bab253f6..507cce87535f0 100644 --- a/store/tikv/interface.go +++ b/store/tikv/interface.go @@ -17,7 +17,6 @@ import ( "context" "time" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" ) @@ -51,15 +50,12 @@ type Storage interface { // Closed returns the closed channel. Closed() <-chan struct{} - // GetSnapshot gets a snapshot that is able to read any data which data is <= ver. - // if ver is MaxVersion or > current max committed version, we will use current version for this snapshot. - GetSnapshot(ver kv.Version) kv.Snapshot // Close store Close() error // UUID return a unique ID which represents a Storage. UUID() string - // CurrentVersion returns current max committed version with the given txnScope (local or global). - CurrentVersion(txnScope string) (kv.Version, error) + // CurrentTimestamp returns current timestamp with the given txnScope (local or global). + CurrentTimestamp(txnScope string) (uint64, error) // GetOracle gets a timestamp oracle client. GetOracle() oracle.Oracle // SupportDeleteRange gets the storage support delete range or not. diff --git a/store/tikv/kv.go b/store/tikv/kv.go index a2b33e7c41b70..6fe6f5c9518be 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -212,9 +212,9 @@ func (s *KVStore) beginWithExactStaleness(txnScope string, prevSec uint64) (*KVT } // GetSnapshot gets a snapshot that is able to read any data which data is <= ver. -// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot. -func (s *KVStore) GetSnapshot(ver kv.Version) kv.Snapshot { - snapshot := newTiKVSnapshot(s, ver, s.nextReplicaReadSeed()) +// if ts is MaxVersion or > current max committed version, we will use current version for this snapshot. +func (s *KVStore) GetSnapshot(ts uint64) kv.Snapshot { + snapshot := newTiKVSnapshot(s, ts, s.nextReplicaReadSeed()) return snapshot } @@ -244,14 +244,14 @@ func (s *KVStore) UUID() string { return s.uuid } -// CurrentVersion returns current max committed version with the given txnScope (local or global). -func (s *KVStore) CurrentVersion(txnScope string) (kv.Version, error) { +// CurrentTimestamp returns current timestamp with the given txnScope (local or global). +func (s *KVStore) CurrentTimestamp(txnScope string) (uint64, error) { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) startTS, err := s.getTimestampWithRetry(bo, txnScope) if err != nil { - return kv.NewVersion(0), errors.Trace(err) + return 0, errors.Trace(err) } - return kv.NewVersion(startTS), nil + return startTS, nil } func (s *KVStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint64, error) { diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 1399a7770cae5..84d2cadde7455 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -160,7 +160,7 @@ func (s *testLockSuite) TestScanLockResolveWithBatchGet(c *C) { keys = append(keys, []byte{ch}) } - ver, err := s.store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) c.Assert(err, IsNil) snapshot := newTiKVSnapshot(s.store, ver, 0) m, err := snapshot.BatchGet(context.Background(), keys) @@ -401,12 +401,12 @@ func (s *testLockSuite) prewriteTxnWithTTL(c *C, txn *KVTxn, ttl uint64) { } func (s *testLockSuite) mustGetLock(c *C, key []byte) *Lock { - ver, err := s.store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) c.Assert(err, IsNil) bo := NewBackofferWithVars(context.Background(), getMaxBackoff, nil) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ Key: key, - Version: ver.Ver, + Version: ver, }) loc, err := s.store.regionCache.LocateKey(bo, key) c.Assert(err, IsNil) diff --git a/store/tikv/safepoint_test.go b/store/tikv/safepoint_test.go index 44799fe05c7a8..b1cea4bf6e5bb 100644 --- a/store/tikv/safepoint_test.go +++ b/store/tikv/safepoint_test.go @@ -113,7 +113,7 @@ func (s *testSafePointSuite) TestSafePoint(c *C) { s.waitUntilErrorPlugIn(txn4.startTS) - snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn4.StartTS()}, 0) + snapshot := newTiKVSnapshot(s.store, txn4.StartTS(), 0) _, batchgeterr := snapshot.BatchGet(context.Background(), keys) c.Assert(batchgeterr, NotNil) isFallBehind = terror.ErrorEqual(errors.Cause(geterr2), ErrGCTooEarly) diff --git a/store/tikv/scan.go b/store/tikv/scan.go index 1c376fd7b5127..b8d9a1520eab0 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -86,7 +86,7 @@ func (s *Scanner) Value() []byte { // Next return next element. func (s *Scanner) Next() error { - bo := NewBackofferWithVars(context.WithValue(context.Background(), TxnStartKey, s.snapshot.version.Ver), scannerNextMaxBackoff, s.snapshot.vars) + bo := NewBackofferWithVars(context.WithValue(context.Background(), TxnStartKey, s.snapshot.version), scannerNextMaxBackoff, s.snapshot.vars) if !s.valid { return errors.New("scanner iterator is invalid") } @@ -140,7 +140,7 @@ func (s *Scanner) Close() { } func (s *Scanner) startTS() uint64 { - return s.snapshot.version.Ver + return s.snapshot.version } func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error { @@ -245,7 +245,7 @@ func (s *Scanner) getData(bo *Backoffer) error { if err != nil { return errors.Trace(err) } - msBeforeExpired, _, err := newLockResolver(s.snapshot.store).ResolveLocks(bo, s.snapshot.version.Ver, []*Lock{lock}) + msBeforeExpired, _, err := newLockResolver(s.snapshot.store).ResolveLocks(bo, s.snapshot.version, []*Lock{lock}) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/scan_mock_test.go b/store/tikv/scan_mock_test.go index 6e5b23ab25bb1..4f184e5c556be 100644 --- a/store/tikv/scan_mock_test.go +++ b/store/tikv/scan_mock_test.go @@ -17,7 +17,6 @@ import ( "context" . "github.com/pingcap/check" - "github.com/pingcap/tidb/kv" ) type testScanMockSuite struct { @@ -41,7 +40,7 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) { txn, err = store.Begin() c.Assert(err, IsNil) - snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}, 0) + snapshot := newTiKVSnapshot(store, txn.StartTS(), 0) scanner, err := newScanner(snapshot, []byte("a"), nil, 10, false) c.Assert(err, IsNil) for ch := byte('a'); ch <= byte('z'); ch++ { @@ -74,7 +73,7 @@ func (s *testScanMockSuite) TestReverseScan(c *C) { txn, err = store.Begin() c.Assert(err, IsNil) - snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}, 0) + snapshot := newTiKVSnapshot(store, txn.StartTS(), 0) scanner, err := newScanner(snapshot, nil, []byte("z"), 10, true) c.Assert(err, IsNil) for ch := byte('y'); ch >= byte('a'); ch-- { diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 4619fd0e86901..14a7e2091a7c5 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -48,12 +48,13 @@ var ( const ( scanBatchSize = 256 batchGetSize = 5120 + maxTimestamp = math.MaxUint64 ) // tikvSnapshot implements the kv.Snapshot interface. type tikvSnapshot struct { store *KVStore - version kv.Version + version uint64 isolationLevel kv.IsoLevel priority pb.CommandPri notFillCache bool @@ -87,15 +88,15 @@ type tikvSnapshot struct { } // newTiKVSnapshot creates a snapshot of an TiKV store. -func newTiKVSnapshot(store *KVStore, ver kv.Version, replicaReadSeed uint32) *tikvSnapshot { +func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *tikvSnapshot { // Sanity check for snapshot version. - if ver.Ver >= math.MaxInt64 && ver.Ver != math.MaxUint64 { - err := errors.Errorf("try to get snapshot with a large ts %d", ver.Ver) + if ts >= math.MaxInt64 && ts != math.MaxUint64 { + err := errors.Errorf("try to get snapshot with a large ts %d", ts) panic(err) } return &tikvSnapshot{ store: store, - version: ver, + version: ts, priority: pb.CommandPri_Normal, vars: kv.DefaultVars, replicaReadSeed: replicaReadSeed, @@ -110,7 +111,7 @@ func (s *tikvSnapshot) setSnapshotTS(ts uint64) { panic(err) } // Invalidate cache if the snapshotTS change! - s.version.Ver = ts + s.version = ts s.mu.Lock() s.mu.cached = nil s.mu.Unlock() @@ -146,7 +147,7 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string] // We want [][]byte instead of []kv.Key, use some magic to save memory. bytesKeys := *(*[][]byte)(unsafe.Pointer(&keys)) - ctx = context.WithValue(ctx, TxnStartKey, s.version.Ver) + ctx = context.WithValue(ctx, TxnStartKey, s.version) bo := NewBackofferWithVars(ctx, batchGetMaxBackoff, s.vars) // Create a map to collect key-values from region servers. @@ -165,7 +166,7 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string] return nil, errors.Trace(err) } - err = s.store.CheckVisibility(s.version.Ver) + err = s.store.CheckVisibility(s.version) if err != nil { return nil, errors.Trace(err) } @@ -256,7 +257,7 @@ func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, colle if e := <-ch; e != nil { logutil.BgLogger().Debug("snapshot batchGet failed", zap.Error(e), - zap.Uint64("txnStartTS", s.version.Ver)) + zap.Uint64("txnStartTS", s.version)) err = e } } @@ -281,7 +282,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll s.mu.RLock() req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdBatchGet, &pb.BatchGetRequest{ Keys: pending, - Version: s.version.Ver, + Version: s.version, }, s.mu.replicaRead, &s.replicaReadSeed, pb.Context{ Priority: s.priority, NotFillCache: s.notFillCache, @@ -349,7 +350,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll s.mergeExecDetail(batchGetResp.ExecDetailsV2) } if len(lockedKeys) > 0 { - msBeforeExpired, err := cli.ResolveLocks(bo, s.version.Ver, locks) + msBeforeExpired, err := cli.ResolveLocks(bo, s.version, locks) if err != nil { return errors.Trace(err) } @@ -377,14 +378,14 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) { metrics.TxnCmdHistogramWithGet.Observe(time.Since(start).Seconds()) }(time.Now()) - ctx = context.WithValue(ctx, TxnStartKey, s.version.Ver) + ctx = context.WithValue(ctx, TxnStartKey, s.version) bo := NewBackofferWithVars(ctx, getMaxBackoff, s.vars) val, err := s.get(ctx, bo, k) s.recordBackoffInfo(bo) if err != nil { return nil, errors.Trace(err) } - err = s.store.CheckVisibility(s.version.Ver) + err = s.store.CheckVisibility(s.version) if err != nil { return nil, errors.Trace(err) } @@ -422,7 +423,7 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte // Secondary locks or async commit locks cannot be ignored when getting using the max version. // So we concurrently get a TS from PD and use it in retries to avoid unnecessary blocking. var tsFuture oracle.Future - if s.version == kv.MaxVersion { + if s.version == maxTimestamp { tsFuture = s.store.oracle.GetTimestampAsync(ctx, &oracle.Option{TxnScope: s.txnScope}) } failpoint.Inject("snapshotGetTSAsync", nil) @@ -439,7 +440,7 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &pb.GetRequest{ Key: k, - Version: s.version.Ver, + Version: s.version, }, s.mu.replicaRead, &s.replicaReadSeed, pb.Context{ Priority: s.priority, NotFillCache: s.notFillCache, @@ -489,13 +490,13 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte return nil, errors.Trace(err) } - snapVer := s.version.Ver - if s.version == kv.MaxVersion { + snapVer := s.version + if s.version == maxTimestamp { newTS, err := tsFuture.Wait() if err != nil { return nil, errors.Trace(err) } - s.version = kv.NewVersion(newTS) + s.version = newTS req.Req.(*pb.GetRequest).Version = newTS // skip lock resolving and backoff if the lock does not block the read if newTS < lock.TxnID || newTS < lock.MinCommitTS { diff --git a/store/tikv/snapshot_fail_test.go b/store/tikv/snapshot_fail_test.go index 472d65ed0a5a6..dba9942f5367c 100644 --- a/store/tikv/snapshot_fail_test.go +++ b/store/tikv/snapshot_fail_test.go @@ -142,7 +142,7 @@ func (s *testSnapshotFailSuite) TestScanResponseKeyError(c *C) { func (s *testSnapshotFailSuite) TestRetryPointGetWithTS(c *C) { defer s.cleanup(c) - snapshot := s.store.GetSnapshot(kv.MaxVersion) + snapshot := s.store.GetSnapshot(maxTimestamp) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/snapshotGetTSAsync", `pause`), IsNil) ch := make(chan error) go func() { @@ -200,7 +200,7 @@ func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) { // Wait until prewrite finishes time.Sleep(200 * time.Millisecond) // Should get nothing with max version, and **not pushing forward minCommitTS** of the primary lock - snapshot := s.store.GetSnapshot(kv.MaxVersion) + snapshot := s.store.GetSnapshot(maxTimestamp) _, err = snapshot.Get(context.Background(), []byte("k2")) c.Assert(err, ErrorMatches, ".*key not exist") @@ -209,7 +209,7 @@ func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) { <-ch // check the minCommitTS is not pushed forward - snapshot = s.store.GetSnapshot(kv.Version{Ver: initialCommitTS}) + snapshot = s.store.GetSnapshot(initialCommitTS) v, err := snapshot.Get(context.Background(), []byte("k2")) c.Assert(err, IsNil) c.Assert(v, DeepEquals, []byte("v2")) diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 7f41be4fb9213..4e0b9a3f5bc3a 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -71,7 +71,7 @@ func (s *testSnapshotSuite) beginTxn(c *C) *KVTxn { func (s *testSnapshotSuite) checkAll(keys []kv.Key, c *C) { txn := s.beginTxn(c) - snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}, 0) + snapshot := newTiKVSnapshot(s.store, txn.StartTS(), 0) m, err := snapshot.BatchGet(context.Background(), keys) c.Assert(err, IsNil) @@ -130,7 +130,7 @@ func (s *testSnapshotSuite) TestSnapshotCache(c *C) { c.Assert(txn.Commit(context.Background()), IsNil) txn = s.beginTxn(c) - snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}, 0) + snapshot := newTiKVSnapshot(s.store, txn.StartTS(), 0) _, err := snapshot.BatchGet(context.Background(), []kv.Key{kv.Key("x"), kv.Key("y")}) c.Assert(err, IsNil) @@ -256,7 +256,7 @@ func (s *testSnapshotSuite) TestPointGetSkipTxnLock(c *C) { committer.lockTTL = 3000 c.Assert(committer.prewriteMutations(bo, committer.mutations), IsNil) - snapshot := newTiKVSnapshot(s.store, kv.MaxVersion, 0) + snapshot := newTiKVSnapshot(s.store, maxTimestamp, 0) start := time.Now() c.Assert(committer.primary(), BytesEquals, []byte(x)) // Point get secondary key. Shouldn't be blocked by the lock and read old data. @@ -268,7 +268,7 @@ func (s *testSnapshotSuite) TestPointGetSkipTxnLock(c *C) { committer.commitTS = txn.StartTS() + 1 committer.commitMutations(bo, committer.mutationsOfKeys([][]byte{committer.primary()})) - snapshot = newTiKVSnapshot(s.store, kv.MaxVersion, 0) + snapshot = newTiKVSnapshot(s.store, maxTimestamp, 0) start = time.Now() // Point get secondary key. Should read committed data. value, err := snapshot.Get(ctx, y) @@ -285,7 +285,7 @@ func (s *testSnapshotSuite) TestSnapshotThreadSafe(c *C) { err := txn.Commit(context.Background()) c.Assert(err, IsNil) - snapshot := newTiKVSnapshot(s.store, kv.MaxVersion, 0) + snapshot := newTiKVSnapshot(s.store, maxTimestamp, 0) var wg sync.WaitGroup wg.Add(5) for i := 0; i < 5; i++ { @@ -306,7 +306,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) { reqStats := NewRegionRequestRuntimeStats() RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Second) RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Millisecond) - snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: 0}, 0) + snapshot := newTiKVSnapshot(s.store, 0, 0) snapshot.SetOption(kv.CollectRuntimeStats, &SnapshotRuntimeStats{}) snapshot.mergeRegionRequestStats(reqStats.Stats) snapshot.mergeRegionRequestStats(reqStats.Stats) diff --git a/store/tikv/split_test.go b/store/tikv/split_test.go index bf02b64405a6f..7076b72f6d59f 100644 --- a/store/tikv/split_test.go +++ b/store/tikv/split_test.go @@ -17,7 +17,6 @@ import ( "context" . "github.com/pingcap/check" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore/mocktikv" "github.com/pingcap/tidb/store/tikv/mockstore/cluster" ) @@ -67,7 +66,7 @@ func (s *testSplitSuite) TestSplitBatchGet(c *C) { c.Assert(err, IsNil) txn := s.begin(c) - snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}, 0) + snapshot := newTiKVSnapshot(s.store, txn.StartTS(), 0) keys := [][]byte{{'a'}, {'b'}, {'c'}} _, region, err := s.store.regionCache.GroupKeysByRegion(s.bo, keys, nil) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 77b610247a7f6..1c71663b5807c 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -80,8 +80,7 @@ func newTiKVTxn(store *KVStore, txnScope string) (*KVTxn, error) { // newTiKVTxnWithStartTS creates a txn with startTS. func newTiKVTxnWithStartTS(store *KVStore, txnScope string, startTS uint64, replicaReadSeed uint32) (*KVTxn, error) { - ver := kv.NewVersion(startTS) - snapshot := newTiKVSnapshot(store, ver, replicaReadSeed) + snapshot := newTiKVSnapshot(store, startTS, replicaReadSeed) newTiKVTxn := &KVTxn{ snapshot: snapshot, us: kv.NewUnionStore(snapshot), diff --git a/table/tables/tables.go b/table/tables/tables.go index 50c332e92bfcf..7dd3894898a52 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -1756,5 +1756,8 @@ func BuildTableScanFromInfos(tableInfo *model.TableInfo, columnInfos []*model.Co Columns: util.ColumnsToProto(columnInfos, tableInfo.PKIsHandle), PrimaryColumnIds: pkColIds, } + if tableInfo.IsCommonHandle { + tsExec.PrimaryPrefixColumnIds = PrimaryPrefixColumnIDs(tableInfo) + } return tsExec } diff --git a/types/parser_driver/value_expr.go b/types/parser_driver/value_expr.go index 27e9c9d2dd6ee..d3f358994f5be 100644 --- a/types/parser_driver/value_expr.go +++ b/types/parser_driver/value_expr.go @@ -103,6 +103,7 @@ func (n *ValueExpr) Restore(ctx *format.RestoreCtx) error { // This part is used to process flag HasStringWithoutDefaultCharset, which means if we have this flag and the // charset is mysql.DefaultCharset, we don't need to write the default. if n.Type.Charset != "" && + !ctx.Flags.HasStringWithoutCharset() && (!ctx.Flags.HasStringWithoutDefaultCharset() || n.Type.Charset != mysql.DefaultCharset) { ctx.WritePlain("_") ctx.WriteKeyWord(n.Type.Charset) diff --git a/util/parser/ast.go b/util/parser/ast.go index ded22800022cd..de460f24378fd 100644 --- a/util/parser/ast.go +++ b/util/parser/ast.go @@ -18,6 +18,8 @@ import ( "github.com/pingcap/parser/ast" "github.com/pingcap/parser/format" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" ) // GetDefaultDB checks if all columns in the AST have explicit DBName. If not, return specified DBName. @@ -115,6 +117,7 @@ func SimpleCases(node ast.StmtNode, defaultDB, origin string) (s string, ok bool } // RestoreWithDefaultDB returns restore strings for StmtNode with defaultDB +// This function is customized for SQL bind usage. func RestoreWithDefaultDB(node ast.StmtNode, defaultDB, origin string) string { if s, ok := SimpleCases(node, defaultDB, origin); ok { return s @@ -123,10 +126,12 @@ func RestoreWithDefaultDB(node ast.StmtNode, defaultDB, origin string) string { // Three flags for restore with default DB: // 1. RestoreStringSingleQuotes specifies to use single quotes to surround the string; // 2. RestoreSpacesAroundBinaryOperation specifies to add space around binary operation; - // 3. RestoreStringWithoutDefaultCharset specifies to not print default charset before string; - ctx := format.NewRestoreCtx(format.RestoreStringSingleQuotes|format.RestoreSpacesAroundBinaryOperation|format.RestoreStringWithoutDefaultCharset, &sb) + // 3. RestoreStringWithoutCharset specifies to not print charset before string; + // 4. RestoreNameBackQuotes specifies to use back quotes to surround the name; + ctx := format.NewRestoreCtx(format.RestoreStringSingleQuotes|format.RestoreSpacesAroundBinaryOperation|format.RestoreStringWithoutCharset|format.RestoreNameBackQuotes, &sb) ctx.DefaultDB = defaultDB if err := node.Restore(ctx); err != nil { + logutil.BgLogger().Debug("[sql-bind] restore SQL failed", zap.Error(err)) return "" } return sb.String()