Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execution: commit the transaction before responding explain analyze results to the client (#38044) #38441

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic
if analyze := explain.getAnalyzeExecToExecutedNoDelay(); analyze != nil {
toCheck = analyze
isExplainAnalyze = true
a.Ctx.GetSessionVars().StmtCtx.IsExplainAnalyzeDML = isExplainAnalyze
}
}

Expand Down
13 changes: 13 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1843,6 +1843,19 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.EnableOptimizeTrace = false
sc.OptimizeTracer = nil
sc.OptimizerCETrace = nil
<<<<<<< HEAD
=======
sc.StatsLoadStatus = make(map[model.TableItemID]string)
sc.IsSyncStatsFailed = false
sc.IsExplainAnalyzeDML = false
// Firstly we assume that UseDynamicPruneMode can be enabled according session variable, then we will check other conditions
// in PlanBuilder.buildDataSource
if ctx.GetSessionVars().IsDynamicPartitionPruneEnabled() {
sc.UseDynamicPruneMode = true
} else {
sc.UseDynamicPruneMode = false
}
>>>>>>> b0e073478f (execution: commit the transaction before responding explain analyze results to the client (#38044))

sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow

Expand Down
296 changes: 296 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7224,3 +7224,299 @@ func TestIssue36609(t *testing.T) {
tk.MustQuery("select * from t3 straight_join t4 on t3.a = t4.b straight_join t2 on t3.d = t2.c straight_join t1 on t1.a = t2.b straight_join t5 on t4.c = t5.d where t2.b < 100 and t4.a = 10;")
tk.MustQuery("select * from information_schema.statements_summary;")
}
<<<<<<< HEAD
=======

func TestHexIntOrStrPushDownToTiFlash(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b varchar(10));")
tk.MustExec("insert into t values(1, 'tiflash');")
tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1;")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'")

tbl, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "t", L: "t"})
require.NoError(t, err)
// Set the hacked TiFlash replica for explain tests.
tbl.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}

rows := [][]interface{}{
{"TableReader_9", "root", "data:ExchangeSender_8"},
{"└─ExchangeSender_8", "mpp[tiflash]", "ExchangeType: PassThrough"},
{" └─Projection_4", "mpp[tiflash]", "hex(test.t.a)->Column#4"},
{" └─TableFullScan_7", "mpp[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select hex(a) from t;").CheckAt([]int{0, 2, 4}, rows)

rows = [][]interface{}{
{"TableReader_9", "root", "data:ExchangeSender_8"},
{"└─ExchangeSender_8", "mpp[tiflash]", "ExchangeType: PassThrough"},
{" └─Projection_4", "mpp[tiflash]", "hex(test.t.b)->Column#4"},
{" └─TableFullScan_7", "mpp[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select hex(b) from t;").CheckAt([]int{0, 2, 4}, rows)
}

func TestEltPushDownToTiFlash(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b varchar(20))")
tk.MustExec("insert into t values(2147483647, '32')")
tk.MustExec("insert into t values(12, 'abc')")
tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'")

// Create virtual tiflash replica info.
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
require.True(t, exists)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

rows := [][]interface{}{
{"TableReader_9", "root", "data:ExchangeSender_8"},
{"└─ExchangeSender_8", "mpp[tiflash]", "ExchangeType: PassThrough"},
{" └─Projection_4", "mpp[tiflash]", "elt(test.t.a, test.t.b)->Column#4"},
{" └─TableFullScan_7", "mpp[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select elt(a, b) from t;").CheckAt([]int{0, 2, 4}, rows)
}

func TestCastTimeAsDurationToTiFlash(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a date, b datetime(4))")
tk.MustExec("insert into t values('2021-10-26', '2021-10-26')")
tk.MustExec("insert into t values('2021-10-26', '2021-10-26 11:11:11')")
tk.MustExec("insert into t values('2021-10-26', '2021-10-26 11:11:11.111111')")
tk.MustExec("insert into t values('2021-10-26', '2021-10-26 11:11:11.123456')")
tk.MustExec("insert into t values('2021-10-26', '2021-10-26 11:11:11.999999')")

tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'")

// Create virtual tiflash replica info.
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
require.True(t, exists)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

rows := [][]interface{}{
{"TableReader_9", "root", "data:ExchangeSender_8"},
{"└─ExchangeSender_8", "mpp[tiflash]", "ExchangeType: PassThrough"},
{" └─Projection_4", "mpp[tiflash]", "cast(test.t.a, time BINARY)->Column#4, cast(test.t.b, time BINARY)->Column#5"},
{" └─TableFullScan_7", "mpp[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select cast(a as time), cast(b as time) from t;").CheckAt([]int{0, 2, 4}, rows)
}

func TestPartitionTableFallBackStatic(t *testing.T) {
store, _ := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_partition_prune_mode='static'")
tk.MustExec("CREATE TABLE t (a int) PARTITION BY RANGE (a) (PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11));")
tk.MustExec("insert into t values (1),(2),(3),(4),(7),(8),(9),(10)")
tk.MustExec("analyze table t")

// use static plan in static mode
rows := [][]interface{}{
{"PartitionUnion", "", ""},
{"├─TableReader", "", "data:TableFullScan"},
{"│ └─TableFullScan", "table:t, partition:p0", "keep order:false"},
{"└─TableReader", "", "data:TableFullScan"},
{" └─TableFullScan", "table:t, partition:p1", "keep order:false"},
}
tk.MustQuery("explain format='brief' select * from t").CheckAt([]int{0, 3, 4}, rows)

tk.MustExec("CREATE TABLE t2 (a int) PARTITION BY RANGE (a) (PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11));")
tk.MustExec("insert into t2 values (1),(2),(3),(4),(7),(8),(9),(10)")
tk.MustExec("analyze table t2")
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")

// use static plan in dynamic mode due to having not global stats
tk.MustQuery("explain format='brief' select * from t").CheckAt([]int{0, 3, 4}, rows)
tk.MustExec("analyze table t")

// use dynamic plan in dynamic mode with global stats
rows = [][]interface{}{
{"TableReader", "partition:all", "data:TableFullScan"},
{"└─TableFullScan", "table:t", "keep order:false"},
}
tk.MustQuery("explain format='brief' select * from t").CheckAt([]int{0, 3, 4}, rows)

rows = [][]interface{}{
{"Union", "", ""},
{"├─PartitionUnion", "", ""},
{"│ ├─TableReader", "", "data:TableFullScan"},
{"│ │ └─TableFullScan", "table:t, partition:p0", "keep order:false"},
{"│ └─TableReader", "", "data:TableFullScan"},
{"│ └─TableFullScan", "table:t, partition:p1", "keep order:false"},
{"└─PartitionUnion", "", ""},
{" ├─TableReader", "", "data:TableFullScan"},
{" │ └─TableFullScan", "table:t2, partition:p0", "keep order:false"},
{" └─TableReader", "", "data:TableFullScan"},
{" └─TableFullScan", "table:t2, partition:p1", "keep order:false"},
}
// use static plan in dynamic mode due to t2 has no global stats
tk.MustQuery("explain format='brief' select * from t union all select * from t2;").CheckAt([]int{0, 3, 4}, rows)
}

func TestEnableTiFlashReadForWriteStmt(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int)")
tk.MustExec("insert into t values(1, 2)")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t2(a int)")
tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("set @@tidb_enable_tiflash_read_for_write_stmt = ON")

tbl, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "t", L: "t"})
require.NoError(t, err)
// Set the hacked TiFlash replica for explain tests.
tbl.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}

tbl2, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "t2", L: "t2"})
require.NoError(t, err)
// Set the hacked TiFlash replica for explain tests.
tbl2.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}

checkMpp := func(r [][]interface{}) {
check := false
for i := range r {
if r[i][2] == "mpp[tiflash]" {
check = true
break
}
}
require.Equal(t, check, true)
}

// Insert into ... select
rs := tk.MustQuery("explain insert into t2 select a+b from t").Rows()
checkMpp(rs)

rs = tk.MustQuery("explain insert into t2 select t.a from t2 join t on t2.a = t.a").Rows()
checkMpp(rs)

// Replace into ... select
rs = tk.MustQuery("explain replace into t2 select a+b from t").Rows()
checkMpp(rs)

// CTE
rs = tk.MustQuery("explain update t set a=a+1 where b in (select a from t2 where t.a > t2.a)").Rows()
checkMpp(rs)
}

func TestTableRangeFallback(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1 (a int primary key, b int)")
tk.MustExec("create table t2 (c int)")
tk.MustQuery("explain format='brief' select * from t1 where a in (10, 20, 30, 40, 50) and b > 1").Check(testkit.Rows(
"Selection 1.67 root gt(test.t1.b, 1)",
"└─Batch_Point_Get 5.00 root table:t1 handle:[10 20 30 40 50], keep order:false, desc:false"))
tk.MustQuery("explain format='brief' select * from t1 join t2 on t1.b = t2.c where t1.a in (10, 20, 30, 40, 50)").Check(testkit.Rows(
"HashJoin 6.24 root inner join, equal:[eq(test.t1.b, test.t2.c)]",
"├─Selection(Build) 5.00 root not(isnull(test.t1.b))",
"│ └─Batch_Point_Get 5.00 root table:t1 handle:[10 20 30 40 50], keep order:false, desc:false",
"└─TableReader(Probe) 9990.00 root data:Selection",
" └─Selection 9990.00 cop[tikv] not(isnull(test.t2.c))",
" └─TableFullScan 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo"))
tk.MustExec("set @@tidb_opt_range_max_size=10")
tk.MustQuery("explain format='brief' select * from t1 where a in (10, 20, 30, 40, 50) and b > 1").Check(testkit.Rows(
"TableReader 8000.00 root data:Selection",
"└─Selection 8000.00 cop[tikv] gt(test.t1.b, 1), in(test.t1.a, 10, 20, 30, 40, 50)",
" └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo"))
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 Memory capacity of 10 bytes for 'tidb_opt_range_max_size' exceeded when building ranges. Less accurate ranges such as full range are chosen"))
tk.MustQuery("explain format='brief' select * from t1 join t2 on t1.b = t2.c where t1.a in (10, 20, 30, 40, 50)").Check(testkit.Rows(
"HashJoin 10000.00 root inner join, equal:[eq(test.t1.b, test.t2.c)]",
"├─TableReader(Build) 8000.00 root data:Selection",
"│ └─Selection 8000.00 cop[tikv] not(isnull(test.t2.c))",
"│ └─TableFullScan 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo",
"└─TableReader(Probe) 8000.00 root data:Selection",
" └─Selection 8000.00 cop[tikv] in(test.t1.a, 10, 20, 30, 40, 50), not(isnull(test.t1.b))",
" └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo"))
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 Memory capacity of 10 bytes for 'tidb_opt_range_max_size' exceeded when building ranges. Less accurate ranges such as full range are chosen"))
}

func TestPlanCacheForTableRangeFallback(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("set @@tidb_enable_prepared_plan_cache=1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key, b int)")
tk.MustExec("set @@tidb_opt_range_max_size=10")
tk.MustExec("prepare stmt from 'select * from t where a in (?, ?, ?, ?, ?) and b > 1'")
tk.MustExec("set @a=10, @b=20, @c=30, @d=40, @e=50")
tk.MustExec("execute stmt using @a, @b, @c, @d, @e")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 Memory capacity of 10 bytes for 'tidb_opt_range_max_size' exceeded when building ranges. Less accurate ranges such as full range are chosen"))
tk.MustExec("execute stmt using @a, @b, @c, @d, @e")
// The plan with range fallback is not cached.
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
}

func TestIssue37760(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key)")
tk.MustExec("insert into t values (2), (4), (6)")
tk.MustExec("set @@tidb_opt_range_max_size=1")
tk.MustQuery("select * from t where a").Check(testkit.Rows("2", "4", "6"))
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 Memory capacity of 1 bytes for 'tidb_opt_range_max_size' exceeded when building ranges. Less accurate ranges such as full range are chosen"))
}

// TestExplainAnalyzeDMLCommit covers the issue #37373.
func TestExplainAnalyzeDMLCommit(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (c1 int key, c2 int);")
tk.MustExec("insert into t values (1, 1)")

err := failpoint.Enable("github.com/pingcap/tidb/session/mockSleepBeforeTxnCommit", "return(500)")
require.NoError(t, err)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tidb/session/mockSleepBeforeTxnCommit")
}()
// The commit is paused by the failpoint, after the fix the explain statement
// execution should proceed after the commit finishes.
_, err = tk.Exec("explain analyze delete from t;")
require.NoError(t, err)
tk.MustQuery("select * from t").Check(testkit.Rows())
}
>>>>>>> b0e073478f (execution: commit the transaction before responding explain analyze results to the client (#38044))
12 changes: 12 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,10 @@ func (s *session) commitTxnWithTemporaryData(ctx context.Context, txn kv.Transac
sessVars := s.sessionVars
txnTempTables := sessVars.TxnCtx.TemporaryTables
if len(txnTempTables) == 0 {
failpoint.Inject("mockSleepBeforeTxnCommit", func(v failpoint.Value) {
ms := v.(int)
time.Sleep(time.Millisecond * time.Duration(ms))
})
return txn.Commit(ctx)
}

Expand Down Expand Up @@ -2127,6 +2131,14 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec.
se.updateTelemetryMetric(s.(*executor.ExecStmt))
sessVars.TxnCtx.StatementCount++
if rs != nil {
if se.GetSessionVars().StmtCtx.IsExplainAnalyzeDML {
if !sessVars.InTxn() {
se.StmtCommit()
if err := se.CommitTxn(ctx); err != nil {
return nil, err
}
}
}
return &execStmtResult{
RecordSet: rs,
sql: s,
Expand Down
20 changes: 20 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,26 @@ type StatementContext struct {
IsSQLRegistered atomic2.Bool
// IsSQLAndPlanRegistered uses to indicate whether the SQL and plan has been registered for TopSQL.
IsSQLAndPlanRegistered atomic2.Bool
<<<<<<< HEAD
=======
// IsReadOnly uses to indicate whether the SQL is read-only.
IsReadOnly bool
// StatsLoadStatus records StatsLoadedStatus for the index/column which is used in query
StatsLoadStatus map[model.TableItemID]string
// IsSyncStatsFailed indicates whether any failure happened during sync stats
IsSyncStatsFailed bool
// UseDynamicPruneMode indicates whether use UseDynamicPruneMode in query stmt
UseDynamicPruneMode bool
// ColRefFromPlan mark the column ref used by assignment in update statement.
ColRefFromUpdatePlan []int64

// RangeFallback indicates that building complete ranges exceeds the memory limit so it falls back to less accurate ranges such as full range.
RangeFallback bool

// IsExplainAnalyzeDML is true if the statement is "explain analyze DML executors", before responding the explain
// results to the client, the transaction should be committed first. See issue #37373 for more details.
IsExplainAnalyzeDML bool
>>>>>>> b0e073478f (execution: commit the transaction before responding explain analyze results to the client (#38044))
}

// StmtHints are SessionVars related sql hints.
Expand Down