Skip to content

Commit

Permalink
planner: not push tiflash if extra phys tbl id is needed (#32830)
Browse files Browse the repository at this point in the history
close #32829
  • Loading branch information
windtalker authored Mar 5, 2022
1 parent 0d64d31 commit b0f823c
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 9 deletions.
69 changes: 60 additions & 9 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ func (s *tiflashTestSuite) TestReadPartitionTable(c *C) {
tk.MustQuery("select /*+ STREAM_AGG() */ count(*) from t").Check(testkit.Rows("3"))
tk.MustQuery("select * from t order by a").Check(testkit.Rows("1 0", "2 0", "3 0"))

// test union scan
tk.MustExec("begin")
tk.MustExec("insert into t values(4,0)")
tk.MustQuery("select /*+ STREAM_AGG() */ count(*) from t").Check(testkit.Rows("4"))
tk.MustExec("insert into t values(5,0)")
tk.MustQuery("select /*+ STREAM_AGG() */ count(*) from t").Check(testkit.Rows("5"))
tk.MustExec("insert into t values(6,0)")
tk.MustQuery("select /*+ STREAM_AGG() */ count(*) from t").Check(testkit.Rows("6"))
tk.MustExec("commit")
// test union scan, enable it when https://github.com/pingcap/tics/issues/4180 is fixed
// tk.MustExec("begin")
// tk.MustExec("insert into t values(4,0)")
// tk.MustQuery("select /*+ STREAM_AGG() */ count(*) from t").Check(testkit.Rows("4"))
// tk.MustExec("insert into t values(5,0)")
// tk.MustQuery("select /*+ STREAM_AGG() */ count(*) from t").Check(testkit.Rows("5"))
// tk.MustExec("insert into t values(6,0)")
// tk.MustQuery("select /*+ STREAM_AGG() */ count(*) from t").Check(testkit.Rows("6"))
// tk.MustExec("commit")
}

func (s *tiflashTestSuite) TestReadUnsigedPK(c *C) {
Expand Down Expand Up @@ -1071,3 +1071,54 @@ func (s *tiflashTestSuite) TestForbidTiflashDuringStaleRead(c *C) {
c.Assert(strings.Contains(res, "tiflash"), IsFalse)
c.Assert(strings.Contains(res, "tikv"), IsTrue)
}

func (s *tiflashTestSuite) TestForbidTiFlashIfExtraPhysTableIDIsNeeded(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int not null primary key, b int not null) partition by hash(a) partitions 2")
tk.MustExec("alter table t set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("set tidb_partition_prune_mode=dynamic")
tk.MustExec("set tidb_enforce_mpp=1")

rows := tk.MustQuery("explain select count(*) from t").Rows()
resBuff := bytes.NewBufferString("")
for _, row := range rows {
fmt.Fprintf(resBuff, "%s\n", row)
}
res := resBuff.String()
c.Assert(strings.Contains(res, "tiflash"), IsTrue)
c.Assert(strings.Contains(res, "tikv"), IsFalse)

rows = tk.MustQuery("explain select count(*) from t for update").Rows()
resBuff = bytes.NewBufferString("")
for _, row := range rows {
fmt.Fprintf(resBuff, "%s\n", row)
}
res = resBuff.String()
c.Assert(strings.Contains(res, "tiflash"), IsFalse)
c.Assert(strings.Contains(res, "tikv"), IsTrue)

tk.MustExec("begin")
rows = tk.MustQuery("explain select count(*) from t").Rows()
resBuff = bytes.NewBufferString("")
for _, row := range rows {
fmt.Fprintf(resBuff, "%s\n", row)
}
res = resBuff.String()
c.Assert(strings.Contains(res, "tiflash"), IsTrue)
c.Assert(strings.Contains(res, "tikv"), IsFalse)
tk.MustExec("insert into t values(1,2)")
rows = tk.MustQuery("explain select count(*) from t").Rows()
resBuff = bytes.NewBufferString("")
for _, row := range rows {
fmt.Fprintf(resBuff, "%s\n", row)
}
res = resBuff.String()
c.Assert(strings.Contains(res, "tikv"), IsTrue)
c.Assert(strings.Contains(res, "tiflash"), IsFalse)
tk.MustExec("rollback")
}
6 changes: 6 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4020,6 +4020,12 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
if err != nil {
return nil, err
}
if tableHasDirtyContent(b.ctx, tableInfo) && tableInfo.Partition != nil && b.optFlag&flagPartitionProcessor == 0 {
// if partition table has dirty content and the partition prune mode is dynamic, do not read
// from TiFlash because TiFlash does not support virtual column `ExtraPhysTblID` yet
b.ctx.GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because partition table `" + tableInfo.Name.O + "` has uncommitted data when partition prune mode is dynamic.")
possiblePaths = filterOutTiFlashPaths(possiblePaths)
}
// Skip storage engine check for CreateView.
if b.capFlag&canExpandAST == 0 {
possiblePaths, err = filterPathByIsolationRead(b.ctx, possiblePaths, tblName, dbName)
Expand Down
10 changes: 10 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,16 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i
return available, nil
}

func filterOutTiFlashPaths(paths []*util.AccessPath) []*util.AccessPath {
updatedPaths := make([]*util.AccessPath, 0, len(paths))
for _, path := range paths {
if path.StoreType != kv.TiFlash {
updatedPaths = append(updatedPaths, path)
}
}
return updatedPaths
}

func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, tblName model.CIStr, dbName model.CIStr) ([]*util.AccessPath, error) {
// TODO: filter paths with isolation read locations.
if dbName.L == mysql.SystemDB {
Expand Down

0 comments on commit b0f823c

Please sign in to comment.