Skip to content

Commit

Permalink
revert mpp_gather related code
Browse files Browse the repository at this point in the history
Signed-off-by: guo-shaoge <shaoge1994@163.com>
  • Loading branch information
guo-shaoge committed Feb 13, 2023
1 parent b11cef5 commit 3a8afe4
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 115 deletions.
21 changes: 0 additions & 21 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3443,29 +3443,8 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe
startTS: startTs,
mppQueryID: kv.MPPQueryID{QueryTs: getMPPQueryTS(b.ctx), LocalQueryID: getMPPQueryID(b.ctx), ServerID: domain.GetDomain(b.ctx).ServerID()},
memTracker: memory.NewTracker(v.ID(), -1),

// To fill virtual column.
columns: []*model.ColumnInfo{},
virtualColumnIndex: []int{},
virtualColumnRetFieldTypes: []*types.FieldType{},
}
var hasVirtualCol bool
for _, col := range v.Schema().Columns {
if col.VirtualExpr != nil {
hasVirtualCol = true
}
}
if hasVirtualCol {
ts, err := v.GetTableScan()
if err != nil {
b.err = err
return nil
}
gather.columns = ts.Columns
gather.virtualColumnIndex, gather.virtualColumnRetFieldTypes = buildVirtualColumnInfo(gather.Schema(), gather.columns)
}
gather.memTracker.AttachTo(b.ctx.GetSessionVars().StmtCtx.MemTracker)

return gather
}

Expand Down
16 changes: 1 addition & 15 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
Expand Down Expand Up @@ -69,10 +66,6 @@ type MPPGather struct {
respIter distsql.SelectResult

memTracker *memory.Tracker
columns []*model.ColumnInfo

virtualColumnIndex []int
virtualColumnRetFieldTypes []*types.FieldType
}

func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
Expand Down Expand Up @@ -166,14 +159,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
// Next fills data into the chunk passed by its caller.
func (e *MPPGather) Next(ctx context.Context, chk *chunk.Chunk) error {
err := e.respIter.Next(ctx, chk)
if err != nil {
return err
}
err = table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.schema.Columns, e.columns, e.ctx, chk)
if err != nil {
return err
}
return nil
return errors.Trace(err)
}

// Close and release the used resources.
Expand Down
17 changes: 6 additions & 11 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,20 +457,15 @@ func buildVirtualColumnIndex(schema *expression.Schema, columns []*model.ColumnI
return virtualColumnIndex
}

func (e *TableReaderExecutor) buildVirtualColumnInfo() {
e.virtualColumnIndex, e.virtualColumnRetFieldTypes = buildVirtualColumnInfo(e.Schema(), e.columns)
}

// buildVirtualColumnInfo saves virtual column indices and sort them in definition order
func buildVirtualColumnInfo(schema *expression.Schema, columns []*model.ColumnInfo) (colIndexs []int, retTypes []*types.FieldType) {
colIndexs = buildVirtualColumnIndex(schema, columns)
if len(colIndexs) > 0 {
retTypes = make([]*types.FieldType, len(colIndexs))
for i, idx := range colIndexs {
retTypes[i] = schema.Columns[idx].RetType
func (e *TableReaderExecutor) buildVirtualColumnInfo() {
e.virtualColumnIndex = buildVirtualColumnIndex(e.Schema(), e.columns)
if len(e.virtualColumnIndex) > 0 {
e.virtualColumnRetFieldTypes = make([]*types.FieldType, len(e.virtualColumnIndex))
for i, idx := range e.virtualColumnIndex {
e.virtualColumnRetFieldTypes[i] = e.schema.Columns[idx].RetType
}
}
return colIndexs, retTypes
}

type tableResultHandler struct {
Expand Down
35 changes: 0 additions & 35 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1452,38 +1452,3 @@ func TestMPPMemoryTracker(t *testing.T) {
require.NotNil(t, err)
require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!"))
}

func TestDisaggregatedTiFlashGenColumn(t *testing.T) {
config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = true
})
defer config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
})

store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists tt1;")
tk.MustExec("create table tt1(c0 int, c1 varchar(100), c2 varchar(100) AS (lower(c1))) partition by hash(c0) partitions 2;")
tk.MustExec("insert into tt1(c0, c1) values(1, 'ABC'), (2, 'DEF');")
tk.MustExec("alter table tt1 set tiflash replica 1;")
tb := external.GetTableByName(t, tk, "test", "tt1")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustQuery("explain select sum(c0) from tt1;").Check(testkit.Rows(
"HashAgg_15 1.00 root funcs:sum(Column#6)->Column#5",
"└─PartitionUnion_16 2.00 root ",
" ├─HashAgg_34 1.00 root funcs:sum(Column#8)->Column#6",
" │ └─TableReader_36 1.00 root MppVersion: 1, data:ExchangeSender_35",
" │ └─ExchangeSender_35 1.00 mpp[tiflash] ExchangeType: PassThrough",
" │ └─HashAgg_21 1.00 mpp[tiflash] funcs:sum(Column#15)->Column#8",
" │ └─Projection_82 10000.00 mpp[tiflash] cast(test.tt1.c0, decimal(10,0) BINARY)->Column#15",
" │ └─TableFullScan_33 10000.00 mpp[tiflash] table:tt1, partition:p0 keep order:false, stats:pseudo",
" └─HashAgg_62 1.00 root funcs:sum(Column#11)->Column#6",
" └─TableReader_64 1.00 root MppVersion: 1, data:ExchangeSender_63",
" └─ExchangeSender_63 1.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashAgg_49 1.00 mpp[tiflash] funcs:sum(Column#16)->Column#11",
" └─Projection_83 10000.00 mpp[tiflash] cast(test.tt1.c0, decimal(10,0) BINARY)->Column#16",
" └─TableFullScan_61 10000.00 mpp[tiflash] table:tt1, partition:p1 keep order:false, stats:pseudo"))
}
16 changes: 2 additions & 14 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3771,31 +3771,19 @@ func TestShardIndexOnTiFlash(t *testing.T) {
}
}
}
tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("set @@session.tidb_enforce_mpp = 1")
rows := tk.MustQuery("explain select max(b) from t").Rows()
var hasTableFullScan bool
for _, row := range rows {
line := fmt.Sprintf("%v", row)
if strings.Contains(line, "TableFullScan") {
hasTableFullScan = true
require.Contains(t, line, "mpp[tiflash]")
}
require.NotContains(t, line, "tiflash")
}
require.True(t, hasTableFullScan)

tk.MustExec("set @@session.tidb_enforce_mpp = 0")
tk.MustExec("set @@session.tidb_allow_mpp = 0")
rows = tk.MustQuery("explain select max(b) from t").Rows()
hasTableFullScan = false
for _, row := range rows {
line := fmt.Sprintf("%v", row)
if strings.Contains(line, "TableFullScan") {
hasTableFullScan = true
require.Contains(t, line, "tiflash")
}
require.NotContains(t, line, "tiflash")
}
require.True(t, hasTableFullScan)
}

func TestExprPushdownBlacklist(t *testing.T) {
Expand Down
12 changes: 4 additions & 8 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2022,14 +2022,10 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.")
return invalidTask, nil
}
if !isDisaggregatedTiFlash || !canMppConvertToRootForDisaggregatedTiFlash {
// Normally, cannot generate mppTask when got virtual column.
// But in disaggregated tiflash mode, we can convert mppTask of TableScan to rootTask directly.
for _, col := range ts.schema.Columns {
if col.VirtualExpr != nil {
ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.")
return invalidTask, nil
}
for _, col := range ts.schema.Columns {
if col.VirtualExpr != nil {
ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.")
return invalidTask, nil
}
}
mppTask := &mppTask{
Expand Down
11 changes: 0 additions & 11 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2187,18 +2187,7 @@ func accumulateNetSeekCost4MPP(p PhysicalPlan) (cost float64) {
return
}

func tryExpandVirtualColumn(p PhysicalPlan) {
if ts, ok := p.(*PhysicalTableScan); ok {
ts.Columns = ExpandVirtualColumn(ts.Columns, ts.schema, ts.Table.Columns)
return
}
for _, child := range p.Children() {
tryExpandVirtualColumn(child)
}
}

func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
tryExpandVirtualColumn(t.p)
sender := PhysicalExchangeSender{
ExchangeType: tipb.ExchangeType_PassThrough,
}.Init(ctx, t.p.statsInfo())
Expand Down

0 comments on commit 3a8afe4

Please sign in to comment.