Skip to content

Commit

Permalink
planner: add MppVersion for mpp task; support data compression in Exc…
Browse files Browse the repository at this point in the history
…hange Operator; (pingcap#40132)

ref pingcap/tiflash#6620, close pingcap#40494
  • Loading branch information
solotzg authored and ghazalfamilyusa committed Feb 6, 2023
1 parent 5368c6c commit e65cb0e
Show file tree
Hide file tree
Showing 27 changed files with 1,292 additions and 768 deletions.
2 changes: 1 addition & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.
ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx)
_, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
ctx = SetTiFlashMaxThreadsInContext(ctx, sctx)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs, mppQueryID)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs, mppQueryID, sctx.GetSessionVars().ChooseMppVersion())
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3401,6 +3401,7 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe
b.err = err
return nil
}

gather := &MPPGather{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
is: b.is,
Expand Down
6 changes: 5 additions & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,14 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
if err != nil {
return errors.Trace(err)
}

logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs),
zap.Int64("ID", mppTask.ID), zap.Uint64("QueryTs", mppTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", mppTask.MppQueryID.LocalQueryID),
zap.Uint64("ServerID", mppTask.MppQueryID.ServerID), zap.String("address", mppTask.Meta.GetAddress()),
zap.String("plan", plannercore.ToString(pf.ExchangeSender)))
zap.String("plan", plannercore.ToString(pf.ExchangeSender)),
zap.Int64("mpp-version", mppTask.MppVersion.ToInt64()),
zap.String("exchange-compression-mode", pf.ExchangeSender.CompressionMode.Name()),
)
req := &kv.MPPDispatchRequest{
Data: pbData,
Meta: mppTask.Meta,
Expand Down
56 changes: 56 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2071,3 +2071,59 @@ func TestSetChunkReuseVariable(t *testing.T) {
// error value
tk.MustGetErrCode("set @@tidb_enable_reuse_chunk=s;", errno.ErrWrongValueForVar)
}

func TestSetMppVersionVariable(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustQuery("select @@session.mpp_version").Check(testkit.Rows("UNSPECIFIED"))
tk.MustExec("SET SESSION mpp_version = -1")
tk.MustQuery("select @@session.mpp_version").Check(testkit.Rows("-1"))
tk.MustExec("SET SESSION mpp_version = 0")
tk.MustQuery("select @@session.mpp_version").Check(testkit.Rows("0"))
tk.MustExec("SET SESSION mpp_version = 1")
tk.MustQuery("select @@session.mpp_version").Check(testkit.Rows("1"))
tk.MustExec("SET SESSION mpp_version = unspecified")
tk.MustQuery("select @@session.mpp_version").Check(testkit.Rows("unspecified"))
{
tk.MustGetErrMsg("SET SESSION mpp_version = 2", "incorrect value: 2. mpp_version options: -1 (unspecified), 0, 1")
}
{
tk.MustExec("SET GLOBAL mpp_version = 1")
tk.MustQuery("select @@global.mpp_version").Check(testkit.Rows("1"))
tk.MustExec("SET GLOBAL mpp_version = -1")
tk.MustQuery("select @@global.mpp_version").Check(testkit.Rows("-1"))
}
}

func TestSetMppExchangeCompressionModeVariable(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustGetErrMsg(
"SET SESSION mpp_exchange_compression_mode = 123",
"incorrect value: `123`. mpp_exchange_compression_mode options: NONE, FAST, HIGH_COMPRESSION, UNSPECIFIED")
tk.MustQuery("select @@session.mpp_exchange_compression_mode").Check(testkit.Rows("UNSPECIFIED"))

tk.MustExec("SET SESSION mpp_exchange_compression_mode = none")
tk.MustQuery("select @@session.mpp_exchange_compression_mode").Check(testkit.Rows("none"))
tk.MustExec("SET SESSION mpp_exchange_compression_mode = fast")
tk.MustQuery("select @@session.mpp_exchange_compression_mode").Check(testkit.Rows("fast"))
tk.MustExec("SET SESSION mpp_exchange_compression_mode = HIGH_COMPRESSION")
tk.MustQuery("select @@session.mpp_exchange_compression_mode").Check(testkit.Rows("HIGH_COMPRESSION"))

{
tk.MustExec("SET GLOBAL mpp_exchange_compression_mode = none")
tk.MustQuery("select @@global.mpp_exchange_compression_mode").Check(testkit.Rows("none"))
}
{
tk.MustExec("SET mpp_version = 0")
tk.MustExec("SET mpp_exchange_compression_mode = unspecified")
require.Equal(t, len(tk.Session().GetSessionVars().StmtCtx.GetWarnings()), 0)
}
{
tk.MustExec("SET mpp_version = 0")
tk.MustExec("SET mpp_exchange_compression_mode = HIGH_COMPRESSION")
warnings := tk.Session().GetSessionVars().StmtCtx.GetWarnings()
require.Equal(t, len(warnings), 1)
require.Equal(t, warnings[0].Err.Error(), "mpp exchange compression won't work under current mpp version 0")
}
}
1 change: 1 addition & 0 deletions executor/tiflashtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_test(
"//config",
"//domain",
"//executor",
"//kv",
"//meta/autoid",
"//parser/terror",
"//planner/core",
Expand Down
40 changes: 36 additions & 4 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/store/mockstore"
Expand Down Expand Up @@ -629,10 +630,41 @@ func TestDispatchTaskRetry(t *testing.T) {
require.NoError(t, err)
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout", "3*return(true)"))
tk.MustQuery("select count(*) from t").Check(testkit.Rows("4"))
tk.MustQuery("select count(*) from t group by b").Check(testkit.Rows("4"))
require.Nil(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout"))
}

func TestMppVersionError(t *testing.T) {
store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int not null primary key, b int not null)")
tk.MustExec("alter table t set tiflash replica 1")
tk.MustExec("insert into t values(1,0),(2,0),(3,0),(4,0)")
tb := external.GetTableByName(t, tk, "test", "t")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
{
item := fmt.Sprintf("return(%d)", kv.GetNewestMppVersion()+1)
require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/MppVersionError", item))
}
{
err := tk.QueryToErr("select count(*) from t group by b")
require.Error(t, err)
}
require.Nil(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/MppVersionError"))
{
item := fmt.Sprintf("return(%d)", kv.GetNewestMppVersion())
require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/MppVersionError", item))
}
{
tk.MustQuery("select count(*) from t group by b").Check(testkit.Rows("4"))
}
require.Nil(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/MppVersionError"))
}

func TestCancelMppTasks(t *testing.T) {
var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang"
store := testkit.CreateMockStore(t, withMockTiFlash(2))
Expand Down Expand Up @@ -1325,15 +1357,15 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) {
require.NoError(t, err)
tk.MustQuery("explain select * from t1 where c1 < 2").Check(testkit.Rows(
"PartitionUnion_10 9970.00 root ",
"├─TableReader_15 3323.33 root data:ExchangeSender_14",
"├─TableReader_15 3323.33 root MppVersion: 1, data:ExchangeSender_14",
"│ └─ExchangeSender_14 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_13 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_12 10000.00 mpp[tiflash] table:t1, partition:p0 keep order:false, stats:pseudo",
"├─TableReader_19 3323.33 root data:ExchangeSender_18",
"├─TableReader_19 3323.33 root MppVersion: 1, data:ExchangeSender_18",
"│ └─ExchangeSender_18 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_17 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_16 10000.00 mpp[tiflash] table:t1, partition:p1 keep order:false, stats:pseudo",
"└─TableReader_23 3323.33 root data:ExchangeSender_22",
"└─TableReader_23 3323.33 root MppVersion: 1, data:ExchangeSender_22",
" └─ExchangeSender_22 3323.33 mpp[tiflash] ExchangeType: PassThrough",
" └─Selection_21 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
" └─TableFullScan_20 10000.00 mpp[tiflash] table:t1, partition:p2 keep order:false, stats:pseudo"))
Expand Down
1 change: 1 addition & 0 deletions kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ go_test(
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/deadlock",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
114 changes: 111 additions & 3 deletions kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,65 @@ package kv

import (
"context"
"strconv"
"strings"
"time"

"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/tipb/go-tipb"
)

// MppVersion indicates the mpp-version used to build mpp plan
type MppVersion int64

const (
// MppVersionV0 supports TiFlash version [~, ~]
MppVersionV0 MppVersion = iota

// MppVersionV1 supports TiFlash version [v6.6.x, ~]
MppVersionV1

// MppVersionV2
// MppVersionV3

mppVersionMax

newestMppVersion MppVersion = mppVersionMax - 1

// MppVersionUnspecified means the illegal or unspecified version, it only used in TiDB.
MppVersionUnspecified MppVersion = -1

// MppVersionUnspecifiedName denotes name of UNSPECIFIED mpp version
MppVersionUnspecifiedName string = "UNSPECIFIED"
)

// ToInt64 transforms MppVersion to int64
func (v MppVersion) ToInt64() int64 {
return int64(v)
}

// ToMppVersion transforms string to MppVersion
func ToMppVersion(name string) (MppVersion, bool) {
name = strings.ToUpper(name)
if name == MppVersionUnspecifiedName {
return MppVersionUnspecified, true
}
v, err := strconv.ParseInt(name, 10, 64)
if err != nil {
return MppVersionUnspecified, false
}
version := MppVersion(v)
if version >= MppVersionUnspecified && version <= newestMppVersion {
return version, true
}
return MppVersionUnspecified, false
}

// GetNewestMppVersion returns the mpp-version can be used in mpp plan
func GetNewestMppVersion() MppVersion {
return newestMppVersion
}

// MPPTaskMeta means the meta info such as location of a mpp task.
type MPPTaskMeta interface {
// GetAddress indicates which node this task should execute on.
Expand All @@ -40,7 +94,8 @@ type MPPTask struct {
ID int64 // mppTaskID
StartTs uint64
MppQueryID MPPQueryID
TableID int64 // physical table id
TableID int64 // physical table id
MppVersion MppVersion // mpp version

PartitionTableIDs []int64
IsDisaggregatedTiFlashStaticPrune bool
Expand All @@ -54,6 +109,7 @@ func (t *MPPTask) ToPB() *mpp.TaskMeta {
LocalQueryId: t.MppQueryID.LocalQueryID,
ServerId: t.MppQueryID.ServerID,
TaskId: t.ID,
MppVersion: t.MppVersion.ToInt64(),
}
if t.ID != -1 {
meta.Address = t.Meta.GetAddress()
Expand Down Expand Up @@ -94,9 +150,8 @@ type MPPClient interface {
// ConstructMPPTasks schedules task for a plan fragment.
// TODO:: This interface will be refined after we support more executors.
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID MPPQueryID) Response
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID MPPQueryID, mppVersion MppVersion) Response
}

// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
Expand All @@ -107,3 +162,56 @@ type MPPBuildTasksRequest struct {

PartitionIDAndRanges []PartitionIDAndRanges
}

// ExchangeCompressionMode means the compress method used in exchange operator
type ExchangeCompressionMode int

const (
// ExchangeCompressionModeNONE indicates no compression
ExchangeCompressionModeNONE ExchangeCompressionMode = iota
// ExchangeCompressionModeFast indicates fast compression/decompression speed, compression ratio is lower than HC mode
ExchangeCompressionModeFast
// ExchangeCompressionModeHC indicates high compression (HC) ratio mode
ExchangeCompressionModeHC
// ExchangeCompressionModeUnspecified indicates unspecified compress method, let TiDB choose one
ExchangeCompressionModeUnspecified

// RecommendedExchangeCompressionMode indicates recommended compression mode
RecommendedExchangeCompressionMode ExchangeCompressionMode = ExchangeCompressionModeFast

exchangeCompressionModeUnspecifiedName string = "UNSPECIFIED"
)

// Name returns the name of ExchangeCompressionMode
func (t ExchangeCompressionMode) Name() string {
if t == ExchangeCompressionModeUnspecified {
return exchangeCompressionModeUnspecifiedName
}
return t.ToTipbCompressionMode().String()
}

// ToExchangeCompressionMode returns the ExchangeCompressionMode from name
func ToExchangeCompressionMode(name string) (ExchangeCompressionMode, bool) {
name = strings.ToUpper(name)
if name == exchangeCompressionModeUnspecifiedName {
return ExchangeCompressionModeUnspecified, true
}
value, ok := tipb.CompressionMode_value[name]
if ok {
return ExchangeCompressionMode(value), true
}
return ExchangeCompressionModeNONE, false
}

// ToTipbCompressionMode returns tipb.CompressionMode from kv.ExchangeCompressionMode
func (t ExchangeCompressionMode) ToTipbCompressionMode() tipb.CompressionMode {
switch t {
case ExchangeCompressionModeNONE:
return tipb.CompressionMode_NONE
case ExchangeCompressionModeFast:
return tipb.CompressionMode_FAST
case ExchangeCompressionModeHC:
return tipb.CompressionMode_HIGH_COMPRESSION
}
return tipb.CompressionMode_NONE
}
55 changes: 55 additions & 0 deletions kv/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kv
import (
"testing"

"github.com/pingcap/tipb/go-tipb"
"github.com/stretchr/testify/assert"
)

Expand All @@ -30,3 +31,57 @@ func TestVersion(t *testing.T) {
assert.True(t, eq == 0)
assert.True(t, MinVersion.Cmp(MaxVersion) < 0)
}

func TestMppVersion(t *testing.T) {
assert.Equal(t, int64(1), GetNewestMppVersion().ToInt64())
{
v, ok := ToMppVersion("unspecified")
assert.True(t, ok)
assert.Equal(t, v, MppVersionUnspecified)
}
{
v, ok := ToMppVersion("-1")
assert.True(t, ok)
assert.Equal(t, v, MppVersionUnspecified)
}
{
v, ok := ToMppVersion("0")
assert.True(t, ok)
assert.Equal(t, v, MppVersionV0)
}
{
v, ok := ToMppVersion("1")
assert.True(t, ok)
assert.Equal(t, v, MppVersionV1)
}
}

func TestExchangeCompressionMode(t *testing.T) {
assert.Equal(t, "UNSPECIFIED", ExchangeCompressionModeUnspecified.Name())
{
a, ok := ToExchangeCompressionMode("UNSPECIFIED")
assert.Equal(t, a, ExchangeCompressionModeUnspecified)
assert.True(t, ok)
}
assert.Equal(t, "NONE", ExchangeCompressionModeNONE.Name())
{
a, ok := ToExchangeCompressionMode("NONE")
assert.Equal(t, a, ExchangeCompressionModeNONE)
assert.True(t, ok)
}
assert.Equal(t, "FAST", ExchangeCompressionModeFast.Name())
{
a, ok := ToExchangeCompressionMode("FAST")
assert.Equal(t, a, ExchangeCompressionModeFast)
assert.True(t, ok)
}
assert.Equal(t, "HIGH_COMPRESSION", ExchangeCompressionModeHC.Name())
{
a, ok := ToExchangeCompressionMode("HIGH_COMPRESSION")
assert.Equal(t, a, ExchangeCompressionModeHC)
assert.True(t, ok)
}
// default `FAST`
assert.Equal(t, ExchangeCompressionModeFast, RecommendedExchangeCompressionMode)
assert.Equal(t, tipb.CompressionMode_FAST, RecommendedExchangeCompressionMode.ToTipbCompressionMode())
}
Loading

0 comments on commit e65cb0e

Please sign in to comment.