diff --git a/executor/union_scan.go b/executor/union_scan.go index 9a332887c7b46..75b8e9be289e9 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -111,7 +111,11 @@ func (us *UnionScanExec) open(ctx context.Context) error { func (us *UnionScanExec) Next(ctx context.Context, req *chunk.Chunk) error { us.memBuf.RLock() defer us.memBuf.RUnlock() + + // Assume req.Capacity() > 0 after GrowAndReset(), if this assumption fail, + // the for-loop may exit without read one single row! req.GrowAndReset(us.maxChunkSize) + mutableRow := chunk.MutRowFromTypes(retTypes(us)) for i, batchSize := 0, req.Capacity(); i < batchSize; i++ { row, err := us.getOneRow(ctx) diff --git a/executor/union_scan_test.go b/executor/union_scan_test.go index 31f9cc6b98670..8a71fc4b0832a 100644 --- a/executor/union_scan_test.go +++ b/executor/union_scan_test.go @@ -402,3 +402,170 @@ func (s *testSuite7) TestForApplyAndUnionScan(c *C) { tk.MustQuery("select c_int, c_str from t where (select count(*) from t1 where t1.c_int in (t.c_int, t.c_int + 2, t.c_int + 10)) > 2").Check(testkit.Rows()) tk.MustExec("rollback") } +<<<<<<< HEAD +======= + +func TestIssue28073(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("create table t1 (c_int int, c_str varchar(40), primary key (c_int, c_str) , key(c_int)) partition by hash (c_int) partitions 4") + tk.MustExec("create table t2 like t1") + tk.MustExec("insert into t1 values (1, 'flamboyant mcclintock')") + tk.MustExec("insert into t2 select * from t1") + + tk.MustExec("begin") + tk.MustExec("insert into t2 (c_int, c_str) values (2, 'romantic grothendieck')") + tk.MustQuery("select * from t2 left join t1 on t1.c_int = t2.c_int for update").Sort().Check( + testkit.Rows( + "1 flamboyant mcclintock 1 flamboyant mcclintock", + "2 romantic grothendieck ", + )) + tk.MustExec("commit") + + // Check no key is written to table ID 0 + txn, err := store.Begin() + require.NoError(t, err) + start := tablecodec.EncodeTablePrefix(0) + end := tablecodec.EncodeTablePrefix(1) + iter, err := txn.Iter(start, end) + require.NoError(t, err) + + exist := false + for iter.Valid() { + require.Nil(t, iter.Next()) + exist = true + break + } + require.False(t, exist) + + // Another case, left join on partition table should not generate locks on physical ID = 0 + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1 (c_int int, c_str varchar(40), primary key (c_int, c_str));") + tk.MustExec("create table t2 (c_int int, c_str varchar(40), primary key (c_int)) partition by hash (c_int) partitions 4;") + tk.MustExec("insert into t1 (`c_int`, `c_str`) values (1, 'upbeat solomon'), (5, 'sharp rubin');") + tk.MustExec("insert into t2 (`c_int`, `c_str`) values (1, 'clever haibt'), (4, 'kind margulis');") + tk.MustExec("begin pessimistic;") + tk.MustQuery("select * from t1 left join t2 on t1.c_int = t2.c_int for update;").Check(testkit.Rows( + "1 upbeat solomon 1 clever haibt", + "5 sharp rubin ", + )) + key, err := hex.DecodeString("7480000000000000005F728000000000000000") + require.NoError(t, err) + h := helper.NewHelper(store.(helper.Storage)) + resp, err := h.GetMvccByEncodedKey(key) + require.NoError(t, err) + require.Nil(t, resp.Info.Lock) + require.Len(t, resp.Info.Writes, 0) + require.Len(t, resp.Info.Values, 0) + + tk.MustExec("rollback;") +} + +func TestIssue32422(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + + tk.MustExec("create table t (id int, c int, index(id));") + tk.MustExec("insert into t values (3,3), (4,4), (5,5);") + tk.MustExec("alter table t cache;") + + var cacheUsed bool + for i := 0; i < 20; i++ { + tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4")) + if tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache { + cacheUsed = true + break + } + time.Sleep(50 * time.Millisecond) + } + require.True(t, cacheUsed) + + tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4")) + + // Some extra tests. + // Since cached table use UnionScanExec utilities, check what happens when they work together. + // In these cases, the cache data serve as the snapshot, tikv is skipped, and txn membuffer works the same way. + tk.MustExec("begin") + tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4")) + tk.MustExec("insert into t values (6, 6)") + // Check for the new added data. + tk.HasPlan("select id+1, c from t where c = 6;", "UnionScan") + tk.MustQuery("select id+1, c from t where c = 6;").Check(testkit.Rows("7 6")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + // Check for the old data. + tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + + // Point get + tk.HasPlan("select id+1, c from t where id = 6", "PointGet") + tk.MustQuery("select id+1, c from t where id = 6").Check(testkit.Rows("7 6")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + tk.MustQuery("select id+1, c from t where id = 4").Check(testkit.Rows("5 4")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + + // Index Lookup + tk.HasPlan("select id+1, c from t where id = 6", "IndexLookUp") + tk.MustQuery("select id+1, c from t use index(id) where id = 6").Check(testkit.Rows("7 6")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + tk.MustQuery("select id+1, c from t use index(id) where id = 4").Check(testkit.Rows("5 4")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + + // Index Reader + tk.HasPlan("select id from t where id = 6", "IndexReader") + tk.MustQuery("select id from t use index(id) where id = 6").Check(testkit.Rows("6")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + tk.MustQuery("select id from t use index(id) where id = 4").Check(testkit.Rows("4")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + + tk.MustExec("rollback") +} + +func TestIssue36903(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t_vwvgdc") + + tk.MustExec("CREATE TABLE t_vwvgdc (wkey int, pkey int NOT NULL, c_rdsfbc double DEFAULT NULL, PRIMARY KEY (`pkey`));") + tk.MustExec("insert into t_vwvgdc values (2, 15000, 61.75);") + tk.MustExec("BEGIN OPTIMISTIC;") + tk.MustExec("insert into t_vwvgdc (wkey, pkey, c_rdsfbc) values (155, 228000, 99.50);") + tk.MustQuery("select pkey from t_vwvgdc where 0 <> 0 union select pkey from t_vwvgdc;") +} + +func BenchmarkUnionScanRead(b *testing.B) { + store := testkit.CreateMockStore(b) + + tk := testkit.NewTestKit(b, store) + tk.MustExec("use test") + tk.MustExec(`create table t_us ( +c1 varchar(10), +c2 varchar(30), +c3 varchar(1), +c4 varchar(12), +c5 varchar(10), +c6 datetime);`) + tk.MustExec(`begin;`) + for i := 0; i < 8000; i++ { + tk.MustExec("insert into t_us values ('54321', '1234', '1', '000000', '7518', '2014-05-08')") + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + tk.MustQuery("select * from t_us where c1 = '12345'").Check(testkit.Rows()) + } + b.StopTimer() +} + +func TestBenchDaily(t *testing.T) { + benchdaily.Run( + executor.BenchmarkReadLastLinesOfHugeLine, + BenchmarkUnionScanRead, + ) +} +>>>>>>> 81a93a697... executor, util: fix UnionScan Next() skip reading data when passed chunk capacity is 0 (#36961) diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index a15279e1a7ada..cbf8856457425 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -331,7 +331,11 @@ func reCalcCapacity(c *Chunk, maxChunkSize int) int { if c.NumRows() < c.capacity { return c.capacity } - return mathutil.Min(c.capacity*2, maxChunkSize) + newCapacity := c.capacity * 2 + if newCapacity == 0 { + newCapacity = InitialCapacity + } + return mathutil.Min(newCapacity, maxChunkSize) } // Capacity returns the capacity of the Chunk.