Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: fix the lock behaviours when for update is used outside transactions | tidb-test=pr/2365 #54694

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,13 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
isPessimistic := sctx.GetSessionVars().TxnCtx.IsPessimistic

// Special handle for "select for update statement" in pessimistic transaction.
if isPessimistic && a.isSelectForUpdate {
return a.handlePessimisticSelectForUpdate(ctx, e)
if a.isSelectForUpdate {
if sctx.GetSessionVars().UseLowResolutionTSO() {
return nil, errors.New("can not execute select for update statement when 'tidb_low_resolution_tso' is set")
}
Comment on lines +585 to +587
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. When tidb_low_resolution_tso is enabled, the in-transction select for update should be rejected.

Is here sure to be in transaction?

if isPessimistic {
return a.handlePessimisticSelectForUpdate(ctx, e)
}
}

a.prepareFKCascadeContext(e)
Expand Down Expand Up @@ -982,8 +987,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e exec.Executor) (
if snapshotTS != 0 {
return nil, errors.New("can not execute write statement when 'tidb_snapshot' is set")
}
lowResolutionTSO := sctx.GetSessionVars().LowResolutionTSO
if lowResolutionTSO {
if sctx.GetSessionVars().UseLowResolutionTSO() {
return nil, errors.New("can not execute write statement when 'tidb_low_resolution_tso' is set")
}
}
Expand Down
45 changes: 42 additions & 3 deletions pkg/executor/test/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1886,21 +1886,60 @@ func TestLowResolutionTSORead(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@autocommit=1")
tk.MustExec("use test")
tk.MustExec("create table low_resolution_tso(a int)")
tk.MustExec("create table low_resolution_tso(a int key)")
tk.MustExec("insert low_resolution_tso values (1)")

// enable low resolution tso
require.False(t, tk.Session().GetSessionVars().LowResolutionTSO)
tk.MustExec("set @@tidb_low_resolution_tso = 'on'")
require.True(t, tk.Session().GetSessionVars().LowResolutionTSO)

time.Sleep(3 * time.Second)
tk.MustQuery("select * from low_resolution_tso").Check(testkit.Rows("1"))
tk.MustQuery("select * from low_resolution_tso")
err := tk.ExecToErr("update low_resolution_tso set a = 2")
require.Error(t, err)
tk.MustExec("set @@tidb_low_resolution_tso = 'off'")
tk.MustExec("update low_resolution_tso set a = 2")
tk.MustQuery("select * from low_resolution_tso").Check(testkit.Rows("2"))

// Test select for update could not be executed when `tidb_low_resolution_tso` is enabled.
type testCase struct {
optimistic bool
pointGet bool
}
cases := []testCase{
{true, true},
{true, false},
{false, true},
{false, false},
}
tk.MustExec("set @@tidb_low_resolution_tso = 'on'")
for _, test := range cases {
if test.optimistic {
tk.MustExec("begin optimistic")
} else {
tk.MustExec("begin")
}
var err error
if test.pointGet {
err = tk.ExecToErr("select * from low_resolution_tso where a = 1 for update")
} else {
err = tk.ExecToErr("select * from low_resolution_tso for update")
}
require.Error(t, err)
tk.MustExec("rollback")
}
tk.MustQuery("select * from low_resolution_tso for update")
tk.MustQuery("select * from low_resolution_tso where a = 1 for update")

origPessimisticAutoCommit := config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Load()
config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Store(true)
defer func() {
config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Store(origPessimisticAutoCommit)
}()
err = tk.ExecToErr("select * from low_resolution_tso where a = 1 for update")
require.Error(t, err)
err = tk.ExecToErr("select * from low_resolution_tso for update")
require.Error(t, err)
}

func TestAdapterStatement(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/casetest/dag/testdata/plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@
"Cases": [
{
"SQL": "select * from t order by b limit 1 for update",
"Best": "TableReader(Table(t)->TopN([test.t.b],0,1))->TopN([test.t.b],0,1)->Lock",
"Best": "TableReader(Table(t)->TopN([test.t.b],0,1))->TopN([test.t.b],0,1)",
"Hints": "use_index(@`sel_1` `test`.`t` ), no_order_index(@`sel_1` `test`.`t` `primary`), limit_to_cop(@`sel_1`)"
},
{
Expand Down
9 changes: 6 additions & 3 deletions pkg/planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3903,9 +3903,12 @@ func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p b
}
b.ctx.GetSessionVars().StmtCtx.LockTableIDs[tName.TableInfo.ID] = struct{}{}
}
p, err = b.buildSelectLock(p, l)
if err != nil {
return nil, err
lock, _ := getLockWaitTime(b.ctx, l)
if lock {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If !lock, is the above for loop still necessary?

p, err = b.buildSelectLock(p, l)
if err != nil {
return nil, err
}
}
}
b.handleHelper.popMap()
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/testdata/plan_suite_unexported_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
{
"Name": "TestPlanBuilder",
"Cases": [
"DataScan(t)->Lock->Projection",
"DataScan(t)->Projection",
"TableReader(Table(t)->Limit)->Limit->Update",
"TableReader(Table(t)->Limit)->Limit->Delete",
"*core.Explain",
Expand Down
9 changes: 9 additions & 0 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,7 @@ type SessionVars struct {
TxnMode string

// LowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds.
// Do not use it directly, use the `UseLowResolutionTSO` method below.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we unexport it?

LowResolutionTSO bool

// MaxExecutionTime is the timeout for select statement, in milliseconds.
Expand Down Expand Up @@ -3875,3 +3876,11 @@ const (
func (s *SessionVars) GetOptObjective() string {
return s.OptObjective
}

// UseLowResolutionTSO indicates whether low resolution tso could be used for execution.
func (s *SessionVars) UseLowResolutionTSO() bool {
if !s.InRestrictedSQL && s.LowResolutionTSO {
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
return true
}
return false
}
2 changes: 1 addition & 1 deletion pkg/sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func newOracleFuture(ctx context.Context, sctx sessionctx.Context, scope string)
oracleStore := sctx.GetStore().GetOracle()
option := &oracle.Option{TxnScope: scope}

if sctx.GetSessionVars().LowResolutionTSO {
if sctx.GetSessionVars().UseLowResolutionTSO() {
return oracleStore.GetLowResolutionTimestampAsync(ctx, option)
}
return oracleStore.GetTimestampAsync(ctx, option)
Expand Down
32 changes: 27 additions & 5 deletions tests/integrationtest/r/executor/distsql.result
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,15 @@ a b c
15 15 15
drop table if exists t;
create table t(a int, b int, index k(b)) PARTITION BY HASH(a) partitions 4;
begin;
insert into t(a, b) values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8);
explain format='brief' select b from t use index(k) where b > 2 order by b limit 1 for update;
id estRows task access object operator info
Projection 1.00 root executor__distsql.t.b
└─SelectLock 1.00 root for update 0
└─Limit 1.00 root offset:0, count:1
└─IndexReader 1.00 root partition:all index:Limit
└─Limit 1.00 cop[tikv] offset:0, count:1
└─UnionScan 1.00 root gt(executor__distsql.t.b, 2)
└─IndexReader 1.00 root partition:all index:IndexRangeScan
└─IndexRangeScan 1.00 cop[tikv] table:t, index:k(b) range:(2,+inf], keep order:true, stats:pseudo
select b from t use index(k) where b > 2 order by b limit 1 for update;
b
Expand All @@ -171,9 +172,30 @@ id estRows task access object operator info
Projection 1.00 root executor__distsql.t.b
└─SelectLock 1.00 root for update 0
└─Limit 1.00 root offset:0, count:1
└─IndexReader 1.00 root partition:all index:Limit
└─Limit 1.00 cop[tikv] offset:0, count:1
└─IndexRangeScan 1.00 cop[tikv] table:t, index:k(b) range:(2,+inf], keep order:true
└─UnionScan 1.00 root gt(executor__distsql.t.b, 2)
└─IndexReader 1.00 root partition:all index:IndexRangeScan
└─IndexRangeScan 1.00 cop[tikv] table:t, index:k(b) range:(2,+inf], keep order:true, stats:pseudo
select b from t use index(k) where b > 2 order by b limit 1 for update;
b
3
rollback;
insert into t(a, b) values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8);
explain format='brief' select b from t use index(k) where b > 2 order by b limit 1 for update;
id estRows task access object operator info
Limit 1.00 root offset:0, count:1
└─IndexReader 1.00 root partition:all index:Limit
└─Limit 1.00 cop[tikv] offset:0, count:1
└─IndexRangeScan 1.00 cop[tikv] table:t, index:k(b) range:(2,+inf], keep order:true, stats:pseudo
select b from t use index(k) where b > 2 order by b limit 1 for update;
b
3
analyze table t;
explain format='brief' select b from t use index(k) where b > 2 order by b limit 1 for update;
id estRows task access object operator info
Limit 1.00 root offset:0, count:1
└─IndexReader 1.00 root partition:all index:Limit
└─Limit 1.00 cop[tikv] offset:0, count:1
└─IndexRangeScan 1.00 cop[tikv] table:t, index:k(b) range:(2,+inf], keep order:true
select b from t use index(k) where b > 2 order by b limit 1 for update;
b
3
12 changes: 12 additions & 0 deletions tests/integrationtest/r/index_merge.result
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ update t1 set c1 = 100, c2 = 100, c3 = 100 where c1 in (select /*+ use_index_mer
select * from t1;
c1 c2 c3
///// FOR UPDATE
begin;
explain select /*+ use_index_merge(t1) */ * from t1 where c1 < 10 or c2 < 10 and c3 < 10 order by 1 for update;
id estRows task access object operator info
Sort_6 4060.74 root index_merge.t1.c1
Expand All @@ -431,6 +432,17 @@ Sort_6 4060.74 root index_merge.t1.c1
└─TableRowIDScan_12 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
select /*+ use_index_merge(t1) */ * from t1 where c1 < 10 or c2 < 10 and c3 < 10 order by 1 for update;
c1 c2 c3
rollback;
explain select /*+ use_index_merge(t1) */ * from t1 where c1 < 10 or c2 < 10 and c3 < 10 order by 1 for update;
id estRows task access object operator info
Sort_5 4060.74 root index_merge.t1.c1
└─IndexMerge_12 2250.55 root type: union
├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
├─IndexRangeScan_9(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
└─Selection_11(Probe) 2250.55 cop[tikv] or(lt(index_merge.t1.c1, 10), and(lt(index_merge.t1.c2, 10), lt(index_merge.t1.c3, 10)))
└─TableRowIDScan_10 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
select /*+ use_index_merge(t1) */ * from t1 where c1 < 10 or c2 < 10 and c3 < 10 order by 1 for update;
c1 c2 c3
///// TEMPORARY Table. Not support for now.
drop table if exists t1;
create temporary table t1(c1 int, c2 int, c3 int, key(c1), key(c2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,13 +694,20 @@ CTE_0 2.00 root Recursive CTE
├─Projection(Seed Part) 1.00 root 1->Column#2
│ └─TableDual 1.00 root rows:1
└─CTETable(Recursive Part) 1.00 root Scan on CTE_0
begin;
explain format='brief' with x as (select * from (select a from t for update) s) select * from x where a = 1;
id estRows task access object operator info
Projection 10.00 root planner__core__casetest__physicalplantest__physical_plan.t.a
└─SelectLock 10.00 root for update 0
└─TableReader 10.00 root data:Selection
└─Selection 10.00 cop[tikv] eq(planner__core__casetest__physicalplantest__physical_plan.t.a, 1)
└─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo
rollback;
explain format='brief' with x as (select * from (select a from t for update) s) select * from x where a = 1;
id estRows task access object operator info
TableReader 10.00 root data:Selection
└─Selection 10.00 cop[tikv] eq(planner__core__casetest__physicalplantest__physical_plan.t.a, 1)
└─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo
set tidb_opt_force_inline_cte=1; -- enable force inline CTE;
explain format='brief' with cte as (select 1) select * from cte union select * from cte; -- force inline cte while multi-consumer;
id estRows task access object operator info
Expand Down
9 changes: 7 additions & 2 deletions tests/integrationtest/r/planner/core/point_get_plan.result
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ explain format='brief' select count(_tidb_rowid) from t where a=1;
id estRows task access object operator info
StreamAgg 1.00 root funcs:count(planner__core__point_get_plan.t._tidb_rowid)->Column#4
└─Point_Get 1.00 root table:t, index:idx(a)
begin;
explain format='brief' select *, date_format(b, "") from t where a =1 for update;
id estRows task access object operator info
Projection 1.00 root planner__core__point_get_plan.t.a, planner__core__point_get_plan.t.b, date_format(cast(planner__core__point_get_plan.t.b, datetime BINARY), )->Column#4
└─Point_Get 1.00 root table:t, index:idx(a) lock
rollback;
explain format='brief' select *, date_format(b, "") from t where a =1 for update;
id estRows task access object operator info
Projection 1.00 root planner__core__point_get_plan.t.a, planner__core__point_get_plan.t.b, date_format(cast(planner__core__point_get_plan.t.b, datetime BINARY), )->Column#4
└─SelectLock 1.00 root for update 0
└─Point_Get 1.00 root table:t, index:idx(a)
└─Point_Get 1.00 root table:t, index:idx(a)
create table t1 (pk int, a int, b int, primary key(pk), unique key(a));
explain format='brief' select t1.*, _tidb_rowid from t1 where a = 1;
Error 1054 (42S22): Unknown column '_tidb_rowid' in 'field list'
Expand Down
9 changes: 8 additions & 1 deletion tests/integrationtest/t/executor/distsql.test
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,17 @@ select * from tbl use index(idx_a) where a > 10 order by a asc limit 4,1;
# TestIndexLookUpWithSelectForUpdateOnPartitionTable
drop table if exists t;
create table t(a int, b int, index k(b)) PARTITION BY HASH(a) partitions 4;
begin;
insert into t(a, b) values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8);
explain format='brief' select b from t use index(k) where b > 2 order by b limit 1 for update;
select b from t use index(k) where b > 2 order by b limit 1 for update;
analyze table t;
explain format='brief' select b from t use index(k) where b > 2 order by b limit 1 for update;
select b from t use index(k) where b > 2 order by b limit 1 for update;
rollback;
insert into t(a, b) values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8);
explain format='brief' select b from t use index(k) where b > 2 order by b limit 1 for update;
select b from t use index(k) where b > 2 order by b limit 1 for update;
analyze table t;
explain format='brief' select b from t use index(k) where b > 2 order by b limit 1 for update;
select b from t use index(k) where b > 2 order by b limit 1 for update;

6 changes: 5 additions & 1 deletion tests/integrationtest/t/index_merge.test
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,12 @@ update t1 set c1 = 100, c2 = 100, c3 = 100 where c1 in (select /*+ use_index_mer
select * from t1;

--echo ///// FOR UPDATE
begin;
explain select /*+ use_index_merge(t1) */ * from t1 where c1 < 10 or c2 < 10 and c3 < 10 order by 1 for update;
select /*+ use_index_merge(t1) */ * from t1 where c1 < 10 or c2 < 10 and c3 < 10 order by 1 for update;
select /*+ use_index_merge(t1) */ * from t1 where c1 < 10 or c2 < 10 and c3 < 10 order by 1 for update;
rollback;
explain select /*+ use_index_merge(t1) */ * from t1 where c1 < 10 or c2 < 10 and c3 < 10 order by 1 for update;
select /*+ use_index_merge(t1) */ * from t1 where c1 < 10 or c2 < 10 and c3 < 10 order by 1 for update;

--echo ///// TEMPORARY Table. Not support for now.
drop table if exists t1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ explain format='brief' with cte1 as (select 1), cte2 as (with cte3 as (select *
explain format='brief' with cte1 as (select 1), cte2 as (with cte3 as (select * from cte1) select * from cte3) select * from cte2; -- inline cte1, cte2, cte3;
explain format='brief' with cte1 as (select 1), cte2 as (select * from cte1) select * from cte2 a, cte2 b; -- inline cte1, cannot be inlined cte2;
explain format='brief' with recursive cte(a) as (select 1 union select a from cte) select * from cte; -- recursive cte cannot be inlined;
begin;
explain format='brief' with x as (select * from (select a from t for update) s) select * from x where a = 1;
rollback;
explain format='brief' with x as (select * from (select a from t for update) s) select * from x where a = 1;
set tidb_opt_force_inline_cte=1; -- enable force inline CTE;
explain format='brief' with cte as (select 1) select * from cte union select * from cte; -- force inline cte while multi-consumer;
Expand Down Expand Up @@ -1005,4 +1008,4 @@ insert into tbl_43 values("BCmuENPHzSOIMJLPB"),("LDOdXZYpOR"),("R"),("TloTqcHhdg
explain format = 'brief' select min(col_304) from (select /*+ use_index_merge( tbl_43 ) */ * from tbl_43 where not( tbl_43.col_304 between 'YEpfYfPVvhMlHGHSMKm' and 'PE' ) or tbl_43.col_304 in ( 'LUBGzGMA' ) and tbl_43.col_304 between 'HpsjfuSReCwBoh' and 'fta' or not( tbl_43.col_304 between 'MFWmuOsoyDv' and 'TSeMYpDXnFIyp' ) order by col_304) x;
select min(col_304) from (select /*+ use_index_merge( tbl_43 ) */ * from tbl_43 where not( tbl_43.col_304 between 'YEpfYfPVvhMlHGHSMKm' and 'PE' ) or tbl_43.col_304 in ( 'LUBGzGMA' ) and tbl_43.col_304 between 'HpsjfuSReCwBoh' and 'fta' or not( tbl_43.col_304 between 'MFWmuOsoyDv' and 'TSeMYpDXnFIyp' ) order by col_304) x;
explain format = 'brief' select max(col_304) from (select /*+ use_index_merge( tbl_43 ) */ * from tbl_43 where not( tbl_43.col_304 between 'YEpfYfPVvhMlHGHSMKm' and 'PE' ) or tbl_43.col_304 in ( 'LUBGzGMA' ) and tbl_43.col_304 between 'HpsjfuSReCwBoh' and 'fta' or not( tbl_43.col_304 between 'MFWmuOsoyDv' and 'TSeMYpDXnFIyp' ) order by col_304) x;
select max(col_304) from (select /*+ use_index_merge( tbl_43 ) */ * from tbl_43 where not( tbl_43.col_304 between 'YEpfYfPVvhMlHGHSMKm' and 'PE' ) or tbl_43.col_304 in ( 'LUBGzGMA' ) and tbl_43.col_304 between 'HpsjfuSReCwBoh' and 'fta' or not( tbl_43.col_304 between 'MFWmuOsoyDv' and 'TSeMYpDXnFIyp' ) order by col_304) x;
select max(col_304) from (select /*+ use_index_merge( tbl_43 ) */ * from tbl_43 where not( tbl_43.col_304 between 'YEpfYfPVvhMlHGHSMKm' and 'PE' ) or tbl_43.col_304 in ( 'LUBGzGMA' ) and tbl_43.col_304 between 'HpsjfuSReCwBoh' and 'fta' or not( tbl_43.col_304 between 'MFWmuOsoyDv' and 'TSeMYpDXnFIyp' ) order by col_304) x;
3 changes: 3 additions & 0 deletions tests/integrationtest/t/planner/core/point_get_plan.test
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ insert into t values (1, 1);
explain format='brief' select t.*, _tidb_rowid from t where a = 1;
commit;
explain format='brief' select count(_tidb_rowid) from t where a=1;
begin;
explain format='brief' select *, date_format(b, "") from t where a =1 for update;
rollback;
explain format='brief' select *, date_format(b, "") from t where a =1 for update;
create table t1 (pk int, a int, b int, primary key(pk), unique key(a));
-- error 1054
Expand Down