diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index 5fd033c0796a5..e41e7f81f9eb7 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -148,6 +148,35 @@ func (s *tiflashTestSuite) TestReadUnsigedPK(c *C) { tk.MustQuery("select count(*) from t1 , t where t1.a = t.a and ((t1.a < 9223372036854775800 and t1.a > 2) or (t1.a <= 1 and t1.a > -1))").Check(testkit.Rows("3")) } +// to fix https://github.com/pingcap/tidb/issues/27952 +func (s *tiflashTestSuite) TestJoinRace(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int not null, b int not null)") + tk.MustExec("alter table t set tiflash replica 1") + tb := testGetTableByName(c, tk.Se, "test", "t") + err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) + c.Assert(err, IsNil) + tk.MustExec("insert into t values(1,1)") + tk.MustExec("insert into t values(2,1)") + tk.MustExec("insert into t values(3,1)") + tk.MustExec("insert into t values(1,2)") + tk.MustExec("insert into t values(2,2)") + tk.MustExec("insert into t values(3,2)") + tk.MustExec("insert into t values(1,2)") + tk.MustExec("insert into t values(2,2)") + tk.MustExec("insert into t values(3,2)") + tk.MustExec("insert into t values(1,3)") + tk.MustExec("insert into t values(2,3)") + tk.MustExec("insert into t values(3,4)") + tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") + tk.MustExec("set @@session.tidb_enforce_mpp=ON") + tk.MustExec("set @@tidb_opt_broadcast_cartesian_join=0") + tk.MustQuery("select count(*) from (select count(a) x from t group by b) t1 join (select count(a) x from t group by b) t2 on t1.x > t2.x").Check(testkit.Rows("6")) + +} + func (s *tiflashTestSuite) TestMppExecution(c *C) { if israce.RaceEnabled { c.Skip("skip race test because of long running") diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index f22e9c1a13cf5..041d98ec5c30e 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -417,6 +417,7 @@ type SessionVars struct { // mppTaskIDAllocator is used to allocate mpp task id for a session. mppTaskIDAllocator struct { + mu sync.Mutex lastTS uint64 taskID int64 } @@ -887,6 +888,8 @@ type SessionVars struct { // AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's // startTs is different. func (s *SessionVars) AllocMPPTaskID(startTS uint64) int64 { + s.mppTaskIDAllocator.mu.Lock() + defer s.mppTaskIDAllocator.mu.Unlock() if s.mppTaskIDAllocator.lastTS == startTS { s.mppTaskIDAllocator.taskID++ return s.mppTaskIDAllocator.taskID