From 7c5373dfeeb86873815705bbe9965291c1e0bb4a Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Tue, 12 Apr 2022 17:10:35 +0800 Subject: [PATCH] planner: Fix the issue that TiDB may dispatch duplicated tasks to TiFlash (#32813) (#32839) ref pingcap/tics#4163, close pingcap/tidb#32814 --- planner/core/fragment.go | 3 +- planner/core/fragment_test.go | 53 +++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 planner/core/fragment_test.go diff --git a/planner/core/fragment.go b/planner/core/fragment.go index 8adcbf127ff26..188acf1c97555 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -125,7 +125,8 @@ func (f *Fragment) init(p PhysicalPlan) error { } f.TableScan = x case *PhysicalExchangeReceiver: - f.singleton = x.children[0].(*PhysicalExchangeSender).ExchangeType == tipb.ExchangeType_PassThrough + // TODO: after we support partial merge, we should check whether all the target exchangeReceiver is same. + f.singleton = f.singleton || x.children[0].(*PhysicalExchangeSender).ExchangeType == tipb.ExchangeType_PassThrough f.ExchangeReceivers = append(f.ExchangeReceivers, x) case *PhysicalUnionAll: return errors.New("unexpected union all detected") diff --git a/planner/core/fragment_test.go b/planner/core/fragment_test.go new file mode 100644 index 0000000000000..fa8ec9e99763c --- /dev/null +++ b/planner/core/fragment_test.go @@ -0,0 +1,53 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "github.com/pingcap/tipb/go-tipb" + "github.com/stretchr/testify/require" + + "testing" +) + +func TestFragmentInitSingleton(t *testing.T) { + r1, r2 := &PhysicalExchangeReceiver{}, &PhysicalExchangeReceiver{} + r1.SetChildren(&PhysicalExchangeSender{ExchangeType: tipb.ExchangeType_PassThrough}) + r2.SetChildren(&PhysicalExchangeSender{ExchangeType: tipb.ExchangeType_Broadcast}) + p := &PhysicalHashJoin{} + + f := &Fragment{} + p.SetChildren(r1, r1) + err := f.init(p) + require.NoError(t, err) + require.Equal(t, f.singleton, true) + + f = &Fragment{} + p.SetChildren(r1, r2) + err = f.init(p) + require.NoError(t, err) + require.Equal(t, f.singleton, true) + + f = &Fragment{} + p.SetChildren(r2, r1) + err = f.init(p) + require.NoError(t, err) + require.Equal(t, f.singleton, true) + + f = &Fragment{} + p.SetChildren(r2, r2) + err = f.init(p) + require.NoError(t, err) + require.Equal(t, f.singleton, false) +}