Skip to content

Commit

Permalink
cherry pick pingcap#36961 to release-5.2
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
tiancaiamao authored and ti-srebot committed Aug 24, 2022
1 parent aa97c67 commit 734a803
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 1 deletion.
4 changes: 4 additions & 0 deletions executor/union_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ func (us *UnionScanExec) open(ctx context.Context) error {
func (us *UnionScanExec) Next(ctx context.Context, req *chunk.Chunk) error {
us.memBuf.RLock()
defer us.memBuf.RUnlock()

// Assume req.Capacity() > 0 after GrowAndReset(), if this assumption fail,
// the for-loop may exit without read one single row!
req.GrowAndReset(us.maxChunkSize)

mutableRow := chunk.MutRowFromTypes(retTypes(us))
for i, batchSize := 0, req.Capacity(); i < batchSize; i++ {
row, err := us.getOneRow(ctx)
Expand Down
167 changes: 167 additions & 0 deletions executor/union_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,3 +402,170 @@ func (s *testSuite7) TestForApplyAndUnionScan(c *C) {
tk.MustQuery("select c_int, c_str from t where (select count(*) from t1 where t1.c_int in (t.c_int, t.c_int + 2, t.c_int + 10)) > 2").Check(testkit.Rows())
tk.MustExec("rollback")
}
<<<<<<< HEAD
=======

func TestIssue28073(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1 (c_int int, c_str varchar(40), primary key (c_int, c_str) , key(c_int)) partition by hash (c_int) partitions 4")
tk.MustExec("create table t2 like t1")
tk.MustExec("insert into t1 values (1, 'flamboyant mcclintock')")
tk.MustExec("insert into t2 select * from t1")

tk.MustExec("begin")
tk.MustExec("insert into t2 (c_int, c_str) values (2, 'romantic grothendieck')")
tk.MustQuery("select * from t2 left join t1 on t1.c_int = t2.c_int for update").Sort().Check(
testkit.Rows(
"1 flamboyant mcclintock 1 flamboyant mcclintock",
"2 romantic grothendieck <nil> <nil>",
))
tk.MustExec("commit")

// Check no key is written to table ID 0
txn, err := store.Begin()
require.NoError(t, err)
start := tablecodec.EncodeTablePrefix(0)
end := tablecodec.EncodeTablePrefix(1)
iter, err := txn.Iter(start, end)
require.NoError(t, err)

exist := false
for iter.Valid() {
require.Nil(t, iter.Next())
exist = true
break
}
require.False(t, exist)

// Another case, left join on partition table should not generate locks on physical ID = 0
tk.MustExec("drop table if exists t1, t2;")
tk.MustExec("create table t1 (c_int int, c_str varchar(40), primary key (c_int, c_str));")
tk.MustExec("create table t2 (c_int int, c_str varchar(40), primary key (c_int)) partition by hash (c_int) partitions 4;")
tk.MustExec("insert into t1 (`c_int`, `c_str`) values (1, 'upbeat solomon'), (5, 'sharp rubin');")
tk.MustExec("insert into t2 (`c_int`, `c_str`) values (1, 'clever haibt'), (4, 'kind margulis');")
tk.MustExec("begin pessimistic;")
tk.MustQuery("select * from t1 left join t2 on t1.c_int = t2.c_int for update;").Check(testkit.Rows(
"1 upbeat solomon 1 clever haibt",
"5 sharp rubin <nil> <nil>",
))
key, err := hex.DecodeString("7480000000000000005F728000000000000000")
require.NoError(t, err)
h := helper.NewHelper(store.(helper.Storage))
resp, err := h.GetMvccByEncodedKey(key)
require.NoError(t, err)
require.Nil(t, resp.Info.Lock)
require.Len(t, resp.Info.Writes, 0)
require.Len(t, resp.Info.Values, 0)

tk.MustExec("rollback;")
}

func TestIssue32422(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")

tk.MustExec("create table t (id int, c int, index(id));")
tk.MustExec("insert into t values (3,3), (4,4), (5,5);")
tk.MustExec("alter table t cache;")

var cacheUsed bool
for i := 0; i < 20; i++ {
tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))
if tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache {
cacheUsed = true
break
}
time.Sleep(50 * time.Millisecond)
}
require.True(t, cacheUsed)

tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))

// Some extra tests.
// Since cached table use UnionScanExec utilities, check what happens when they work together.
// In these cases, the cache data serve as the snapshot, tikv is skipped, and txn membuffer works the same way.
tk.MustExec("begin")
tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))
tk.MustExec("insert into t values (6, 6)")
// Check for the new added data.
tk.HasPlan("select id+1, c from t where c = 6;", "UnionScan")
tk.MustQuery("select id+1, c from t where c = 6;").Check(testkit.Rows("7 6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
// Check for the old data.
tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

// Point get
tk.HasPlan("select id+1, c from t where id = 6", "PointGet")
tk.MustQuery("select id+1, c from t where id = 6").Check(testkit.Rows("7 6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
tk.MustQuery("select id+1, c from t where id = 4").Check(testkit.Rows("5 4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

// Index Lookup
tk.HasPlan("select id+1, c from t where id = 6", "IndexLookUp")
tk.MustQuery("select id+1, c from t use index(id) where id = 6").Check(testkit.Rows("7 6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
tk.MustQuery("select id+1, c from t use index(id) where id = 4").Check(testkit.Rows("5 4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

// Index Reader
tk.HasPlan("select id from t where id = 6", "IndexReader")
tk.MustQuery("select id from t use index(id) where id = 6").Check(testkit.Rows("6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
tk.MustQuery("select id from t use index(id) where id = 4").Check(testkit.Rows("4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

tk.MustExec("rollback")
}

func TestIssue36903(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t_vwvgdc")

tk.MustExec("CREATE TABLE t_vwvgdc (wkey int, pkey int NOT NULL, c_rdsfbc double DEFAULT NULL, PRIMARY KEY (`pkey`));")
tk.MustExec("insert into t_vwvgdc values (2, 15000, 61.75);")
tk.MustExec("BEGIN OPTIMISTIC;")
tk.MustExec("insert into t_vwvgdc (wkey, pkey, c_rdsfbc) values (155, 228000, 99.50);")
tk.MustQuery("select pkey from t_vwvgdc where 0 <> 0 union select pkey from t_vwvgdc;")
}

func BenchmarkUnionScanRead(b *testing.B) {
store := testkit.CreateMockStore(b)

tk := testkit.NewTestKit(b, store)
tk.MustExec("use test")
tk.MustExec(`create table t_us (
c1 varchar(10),
c2 varchar(30),
c3 varchar(1),
c4 varchar(12),
c5 varchar(10),
c6 datetime);`)
tk.MustExec(`begin;`)
for i := 0; i < 8000; i++ {
tk.MustExec("insert into t_us values ('54321', '1234', '1', '000000', '7518', '2014-05-08')")
}

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
tk.MustQuery("select * from t_us where c1 = '12345'").Check(testkit.Rows())
}
b.StopTimer()
}

func TestBenchDaily(t *testing.T) {
benchdaily.Run(
executor.BenchmarkReadLastLinesOfHugeLine,
BenchmarkUnionScanRead,
)
}
>>>>>>> 81a93a697... executor, util: fix UnionScan Next() skip reading data when passed chunk capacity is 0 (#36961)
6 changes: 5 additions & 1 deletion util/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,11 @@ func reCalcCapacity(c *Chunk, maxChunkSize int) int {
if c.NumRows() < c.capacity {
return c.capacity
}
return mathutil.Min(c.capacity*2, maxChunkSize)
newCapacity := c.capacity * 2
if newCapacity == 0 {
newCapacity = InitialCapacity
}
return mathutil.Min(newCapacity, maxChunkSize)
}

// Capacity returns the capacity of the Chunk.
Expand Down

0 comments on commit 734a803

Please sign in to comment.