Skip to content

Commit

Permalink
Merge branch 'master' into fix_ddl_panic_loop_ever
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid committed Mar 13, 2021
2 parents 414704a + b534ec7 commit 19abb34
Show file tree
Hide file tree
Showing 52 changed files with 656 additions and 226 deletions.
123 changes: 68 additions & 55 deletions bindinfo/bind_test.go

Large diffs are not rendered by default.

11 changes: 3 additions & 8 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/format"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/metrics"
Expand Down Expand Up @@ -689,14 +688,10 @@ func GenerateBindSQL(ctx context.Context, stmtNode ast.StmtNode, planHint string
// We need to evolve plan based on the current sql, not the original sql which may have different parameters.
// So here we would remove the hint and inject the current best plan hint.
hint.BindHint(stmtNode, &hint.HintsSet{})
var sb strings.Builder
restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, &sb)
restoreCtx.DefaultDB = defaultDB
err := stmtNode.Restore(restoreCtx)
if err != nil {
logutil.Logger(ctx).Debug("[sql-bind] restore SQL failed when generating bind SQL", zap.Error(err))
bindSQL := utilparser.RestoreWithDefaultDB(stmtNode, defaultDB, "")
if bindSQL == "" {
return ""
}
bindSQL := sb.String()
switch n := stmtNode.(type) {
case *ast.DeleteStmt:
deleteIdx := strings.Index(bindSQL, "DELETE")
Expand Down
2 changes: 2 additions & 0 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (e *HashAggExec) Close() error {
e.childResult = nil
e.groupSet, _ = set.NewStringSetWithMemoryUsage()
e.partialResultMap = nil
e.memTracker.ReplaceBytesUsed(0)
return e.baseExecutor.Close()
}
// `Close` may be called after `Open` without calling `Next` in test.
Expand All @@ -255,6 +256,7 @@ func (e *HashAggExec) Close() error {
for range e.finalOutputCh {
}
e.executed = false
e.memTracker.ReplaceBytesUsed(0)
return e.baseExecutor.Close()
}

Expand Down
4 changes: 2 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,7 @@ func (e *AnalyzeFastExec) handleScanIter(iter kv.Iterator) (scanKeysSize int, er
}

func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err error) {
snapshot := e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion)
snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion)
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
Expand All @@ -1166,7 +1166,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err

func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
defer e.wg.Done()
snapshot := e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion)
snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion)
snapshot.SetOption(kv.NotFillCache, true)
snapshot.SetOption(kv.IsolationLevel, kv.RC)
snapshot.SetOption(kv.Priority, kv.PriorityLow)
Expand Down
3 changes: 2 additions & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)

func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader) bool {
if !ctx.GetSessionVars().AllowMPPExecution {
if !ctx.GetSessionVars().AllowMPPExecution || collate.NewCollationEnabled() {
return false
}
_, ok := tr.GetTablePlan().(*plannercore.PhysicalExchangeSender)
Expand Down
25 changes: 15 additions & 10 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,16 +518,21 @@ func (s *testSerialSuite1) TestSetVar(c *C) {
tk.MustExec("SET GLOBAL tidb_enable_extended_stats = off")
tk.MustQuery("select @@global.tidb_enable_extended_stats").Check(testkit.Rows("0"))

tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = on")
tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1"))
tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = off")
tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0"))
tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = on")
tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1"))
tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = off")
tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0"))
c.Assert(tk.ExecToErr("SET SESSION tidb_enable_tiflash_fallback_tikv = 123"), NotNil)
c.Assert(tk.ExecToErr("SET GLOBAL tidb_enable_tiflash_fallback_tikv = 321"), NotNil)
tk.MustExec("SET SESSION tidb_allow_fallback_to_tikv = 'tiflash'")
tk.MustQuery("select @@session.tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash"))
tk.MustExec("SET SESSION tidb_allow_fallback_to_tikv = ''")
tk.MustQuery("select @@session.tidb_allow_fallback_to_tikv").Check(testkit.Rows(""))
tk.MustExec("SET GLOBAL tidb_allow_fallback_to_tikv = 'tiflash'")
tk.MustQuery("select @@global.tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash"))
tk.MustExec("SET GLOBAL tidb_allow_fallback_to_tikv = ''")
tk.MustQuery("select @@global.tidb_allow_fallback_to_tikv").Check(testkit.Rows(""))
tk.MustExec("set @@tidb_allow_fallback_to_tikv = 'tiflash, tiflash, tiflash'")
tk.MustQuery("select @@tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash"))

tk.MustGetErrMsg("SET SESSION tidb_allow_fallback_to_tikv = 'tikv,tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tikv,tiflash'")
tk.MustGetErrMsg("SET GLOBAL tidb_allow_fallback_to_tikv = 'tikv,tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tikv,tiflash'")
tk.MustGetErrMsg("set @@tidb_allow_fallback_to_tikv = 'tidb, tiflash, tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tidb, tiflash, tiflash'")
tk.MustGetErrMsg("set @@tidb_allow_fallback_to_tikv = 'unknown, tiflash, tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'unknown, tiflash, tiflash'")

// Test issue #22145
tk.MustExec(`set global sync_relay_log = "'"`)
Expand Down
20 changes: 17 additions & 3 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
)

type tiflashTestSuite struct {
Expand All @@ -43,7 +42,6 @@ type tiflashTestSuite struct {
}

func (s *tiflashTestSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
var err error
s.store, err = mockstore.NewMockStore(
mockstore.WithClusterInspector(func(c cluster.Cluster) {
Expand Down Expand Up @@ -280,6 +278,23 @@ func (s *tiflashTestSuite) TestPartitionTable(c *C) {
failpoint.Disable("github.com/pingcap/tidb/executor/checkUseMPP")
}

func (s *tiflashTestSuite) TestMppEnum(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int not null primary key, b enum('aca','bca','zca'))")
tk.MustExec("alter table t set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("insert into t values(1,'aca')")
tk.MustExec("insert into t values(2,'bca')")
tk.MustExec("insert into t values(3,'zca')")
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
tk.MustExec("set @@session.tidb_allow_mpp=ON")
tk.MustQuery("select t1.b from t t1 join t t2 on t1.a = t2.a order by t1.b").Check(testkit.Rows("aca", "bca", "zca"))
}

func (s *tiflashTestSuite) TestCancelMppTasks(c *C) {
var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang"
tk := testkit.NewTestKit(c, s.store)
Expand Down Expand Up @@ -314,7 +329,6 @@ func (s *tiflashTestSuite) TestCancelMppTasks(c *C) {

// all goroutines exit if one goroutine hangs but another return errors
func (s *tiflashTestSuite) TestMppGoroutinesExitFromErrors(c *C) {
defer testleak.AfterTest(c)()
// mock non-root tasks return error
var mppNonRootTaskError = "github.com/pingcap/tidb/store/copr/mppNonRootTaskError"
// mock root tasks hang
Expand Down
15 changes: 13 additions & 2 deletions expression/expr_to_pb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,11 @@ func (s *testEvaluatorSuite) TestExprPushDownToFlash(c *C) {
c.Assert(err, IsNil)
exprs = append(exprs, function)

// ExtractDatetime: can be pushed
function, err = NewFunction(mock.NewContext(), ast.Extract, types.NewFieldType(mysql.TypeLonglong), stringColumn, datetimeColumn)
c.Assert(err, IsNil)
exprs = append(exprs, function)

// CastIntAsInt
function, err = NewFunction(mock.NewContext(), ast.Cast, types.NewFieldType(mysql.TypeLonglong), intColumn)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -725,9 +730,15 @@ func (s *testEvaluatorSuite) TestExprPushDownToFlash(c *C) {
function, err = NewFunction(mock.NewContext(), ast.JSONDepth, types.NewFieldType(mysql.TypeLonglong), jsonColumn)
c.Assert(err, IsNil)
exprs = append(exprs, function)

// ExtractDatetimeFromString: can not be pushed
function, err = NewFunction(mock.NewContext(), ast.Extract, types.NewFieldType(mysql.TypeLonglong), stringColumn, stringColumn)
c.Assert(err, IsNil)
exprs = append(exprs, function)

pushed, remained := PushDownExprs(sc, exprs, client, kv.TiFlash)
c.Assert(len(pushed), Equals, len(exprs)-1)
c.Assert(len(remained), Equals, 1)
c.Assert(len(pushed), Equals, len(exprs)-2)
c.Assert(len(remained), Equals, 2)
}

func (s *testEvaluatorSuite) TestExprOnlyPushDownToFlash(c *C) {
Expand Down
10 changes: 9 additions & 1 deletion expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,7 @@ func canFuncBePushed(sf *ScalarFunction, storeType kv.StoreType) bool {
ast.TimestampDiff,
ast.DateAdd,
ast.FromUnixTime,
ast.Extract,

// encryption functions.
ast.MD5,
Expand Down Expand Up @@ -1243,7 +1244,7 @@ func CanExprsPushDown(sc *stmtctx.StatementContext, exprs []Expression, client k
func scalarExprSupportedByTiKV(function *ScalarFunction) bool {
switch function.FuncName.L {
case ast.Substr, ast.Substring, ast.DateAdd, ast.TimestampDiff,
ast.FromUnixTime:
ast.FromUnixTime, ast.Extract:
return false
default:
return true
Expand Down Expand Up @@ -1295,6 +1296,13 @@ func scalarExprSupportedByFlash(function *ScalarFunction) bool {
default:
return false
}
case ast.Extract:
switch function.Function.PbCode() {
case tipb.ScalarFuncSig_ExtractDatetime:
return true
default:
return false
}
default:
return false
}
Expand Down
9 changes: 9 additions & 0 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6301,6 +6301,15 @@ func (s *testIntegrationSerialSuite) TestCollationBasic(c *C) {
tk.MustQuery("select a from t_ci where a='A'").Check(testkit.Rows("a"))
tk.MustQuery("select a from t_ci where a='a '").Check(testkit.Rows("a"))
tk.MustQuery("select a from t_ci where a='a '").Check(testkit.Rows("a"))

tk.MustExec("drop table if exists t")
tk.MustExec("create table t(c set('A', 'B') collate utf8mb4_general_ci);")
tk.MustExec("insert into t values('a');")
tk.MustExec("insert into t values('B');")
tk.MustQuery("select c from t where c = 'a';").Check(testkit.Rows("A"))
tk.MustQuery("select c from t where c = 'A';").Check(testkit.Rows("A"))
tk.MustQuery("select c from t where c = 'b';").Check(testkit.Rows("B"))
tk.MustQuery("select c from t where c = 'B';").Check(testkit.Rows("B"))
}

func (s *testIntegrationSerialSuite) TestWeightString(c *C) {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ require (
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20210223121704-3cd2fc5fad22
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8
github.com/pingcap/parser v0.0.0-20210303061548-f6776f61e268
github.com/pingcap/parser v0.0.0-20210310110710-c7333a4927e6
github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/pingcap/tipb v0.0.0-20210308034246-066a76fd4e1b
github.com/pingcap/tipb v0.0.0-20210309080453-72c4feaa6da7
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
Expand Down
9 changes: 4 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -415,15 +415,15 @@ github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIf
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 h1:M+DNpOu/I3uDmwee6vcnoPd6GgSMqND4gxvDQ/W584U=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/parser v0.0.0-20210303061548-f6776f61e268 h1:yWlvSEhQPDVQU6pgFZv5sEWf94t/dUAMuBRFmLgkpek=
github.com/pingcap/parser v0.0.0-20210303061548-f6776f61e268/go.mod h1:GbEr2PgY72/4XqPZzmzstlOU/+il/wrjeTNFs6ihsSE=
github.com/pingcap/parser v0.0.0-20210310110710-c7333a4927e6 h1:V/6ioJmVUN4q6/aUpNdnT6OOPc48R3tnojcVfTrt4QU=
github.com/pingcap/parser v0.0.0-20210310110710-c7333a4927e6/go.mod h1:GbEr2PgY72/4XqPZzmzstlOU/+il/wrjeTNFs6ihsSE=
github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI=
github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99 h1:/ogXgm4guJzow4UafiyXZ6ciAIPzxImaXYiFvTpKzKY=
github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI=
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible h1:ceznmu/lLseGHP/jKyOa/3u/5H3wtLLLqkH2V3ssSjg=
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20210308034246-066a76fd4e1b h1:AvGm1DqSEwbGgiiu3KVuTtwLl3MqhbwwnJpx82l6/7M=
github.com/pingcap/tipb v0.0.0-20210308034246-066a76fd4e1b/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo=
github.com/pingcap/tipb v0.0.0-20210309080453-72c4feaa6da7 h1:j8MkWmy5tduhHVsdsgZJugN1U9OWTMSBQoZIpn8kqPc=
github.com/pingcap/tipb v0.0.0-20210309080453-72c4feaa6da7/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -844,7 +844,6 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o=
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
Expand Down
46 changes: 29 additions & 17 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1663,7 +1663,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P
return nil, false
}
joins := make([]PhysicalPlan, 0, 8)
if p.ctx.GetSessionVars().AllowMPPExecution {
if p.ctx.GetSessionVars().AllowMPPExecution && !collate.NewCollationEnabled() {
if p.shouldUseMPPBCJ() {
mppJoins := p.tryToGetMppHashJoin(prop, true)
if (p.preferJoinType & preferBCJoin) > 0 {
Expand Down Expand Up @@ -1712,6 +1712,29 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P
return joins, true
}

func canExprsInJoinPushdown(p *LogicalJoin, storeType kv.StoreType) bool {
equalExprs := make([]expression.Expression, 0, len(p.EqualConditions))
for _, eqCondition := range p.EqualConditions {
if eqCondition.FuncName.L == ast.NullEQ {
return false
}
equalExprs = append(equalExprs, eqCondition)
}
if !expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, equalExprs, p.ctx.GetClient(), storeType) {
return false
}
if !expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.LeftConditions, p.ctx.GetClient(), storeType) {
return false
}
if !expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.RightConditions, p.ctx.GetClient(), storeType) {
return false
}
if !expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.OtherConditions, p.ctx.GetClient(), storeType) {
return false
}
return true
}

func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBCJ bool) []PhysicalPlan {
if !prop.IsEmpty() {
return nil
Expand All @@ -1726,10 +1749,10 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
if prop.PartitionTp == property.BroadcastType {
return nil
}
lkeys, rkeys, _, nullEQ := p.GetJoinKeys()
if nullEQ {
if !canExprsInJoinPushdown(p, kv.TiFlash) {
return nil
}
lkeys, rkeys, _, _ := p.GetJoinKeys()
// check match property
baseJoin := basePhysicalJoin{
JoinType: p.JoinType,
Expand Down Expand Up @@ -1810,8 +1833,7 @@ func (p *LogicalJoin) tryToGetBroadCastJoin(prop *property.PhysicalProperty) []P
if prop.TaskTp != property.RootTaskType && !prop.IsFlashProp() {
return nil
}
_, _, _, hasNullEQ := p.GetJoinKeys()
if hasNullEQ {
if !canExprsInJoinPushdown(p, kv.TiFlash) {
return nil
}

Expand All @@ -1828,16 +1850,6 @@ func (p *LogicalJoin) tryToGetBroadCastJoin(prop *property.PhysicalProperty) []P
return nil
}

if !expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.LeftConditions, p.ctx.GetClient(), kv.TiFlash) {
return nil
}
if !expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.RightConditions, p.ctx.GetClient(), kv.TiFlash) {
return nil
}
if !expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.OtherConditions, p.ctx.GetClient(), kv.TiFlash) {
return nil
}

// for left/semi/anti-semi join the global idx must be 1, and for right join the global idx must be 0
if hasPrefer, idx := p.getPreferredBCJLocalIndex(); hasPrefer {
if (idx == 0 && p.JoinType == RightOuterJoin) || (idx == 1 && (p.JoinType == LeftOuterJoin || p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin)) {
Expand Down Expand Up @@ -2110,7 +2122,7 @@ func (p *baseLogicalPlan) canChildPushDown() bool {
return true
case *LogicalJoin, *LogicalProjection:
// TiFlash supports pushing down more operators
return p.SCtx().GetSessionVars().AllowBCJ || p.SCtx().GetSessionVars().AllowMPPExecution
return p.SCtx().GetSessionVars().AllowBCJ || (p.SCtx().GetSessionVars().AllowMPPExecution && !collate.NewCollationEnabled())
default:
return false
}
Expand Down Expand Up @@ -2334,7 +2346,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
if la.ctx.GetSessionVars().AllowBCJ {
taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType)
}
canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP()
canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && !collate.NewCollationEnabled() && la.checkCanPushDownToMPP()
if canPushDownToMPP {
taskTypes = append(taskTypes, property.MppTaskType)
}
Expand Down
Loading

0 comments on commit 19abb34

Please sign in to comment.