Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add MppVersion for mpp task; support data compression in Exchange Operator; #40132

Merged
merged 98 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from 86 commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
7d31a46
wip
solotzg Dec 14, 2022
5f306af
1
solotzg Dec 15, 2022
69c5276
2
solotzg Dec 16, 2022
880f15f
3
solotzg Dec 16, 2022
82c0c9b
4
solotzg Dec 16, 2022
62dcad7
5
solotzg Dec 16, 2022
9269726
6
solotzg Dec 19, 2022
5ea73ec
7
solotzg Dec 20, 2022
8f9bba1
8
solotzg Dec 20, 2022
5a907e3
9
solotzg Dec 20, 2022
96116d6
10
solotzg Dec 20, 2022
18687bb
11
solotzg Dec 20, 2022
31c627d
12
solotzg Dec 20, 2022
90ffb98
13
solotzg Dec 21, 2022
0940576
14
solotzg Dec 21, 2022
e77a626
15
solotzg Dec 21, 2022
3d367f5
16
solotzg Dec 22, 2022
79d5674
17
solotzg Dec 22, 2022
f462d70
18
solotzg Dec 22, 2022
dcd1e06
19
solotzg Dec 22, 2022
99da3db
20
solotzg Dec 22, 2022
0a80fdd
21
solotzg Dec 22, 2022
e3b1acb
22
solotzg Dec 22, 2022
2985825
23
solotzg Dec 22, 2022
aa5e11f
24
solotzg Dec 23, 2022
dd0b8f6
25
solotzg Dec 23, 2022
704e4d6
26
solotzg Dec 23, 2022
6b0a6b1
27
solotzg Dec 23, 2022
587f73c
28
solotzg Dec 23, 2022
0c7c08f
29
solotzg Dec 23, 2022
a7bce2d
30
solotzg Dec 23, 2022
c7bc5bc
31
solotzg Dec 26, 2022
6069e3f
32
solotzg Dec 26, 2022
30fb373
33
solotzg Dec 27, 2022
35cbcb5
34
solotzg Dec 27, 2022
03a7d2d
32
solotzg Jan 5, 2023
898a135
Merge remote-tracking branch 'origin/mpp-version' into mpp-version
solotzg Jan 5, 2023
4cc0e9e
35
solotzg Jan 5, 2023
384c450
36
solotzg Jan 5, 2023
ee5d646
37
solotzg Jan 5, 2023
a2e65e2
38
solotzg Jan 5, 2023
d72b039
39
solotzg Jan 5, 2023
144eace
40
solotzg Jan 5, 2023
679cd20
41
solotzg Jan 5, 2023
aa82fde
42
solotzg Jan 5, 2023
f83416e
43
solotzg Jan 6, 2023
748269e
44
solotzg Jan 6, 2023
b66c56f
45
solotzg Jan 6, 2023
6dd1cfd
46
solotzg Jan 6, 2023
76bd03e
Merge remote-tracking branch 'pingcap/master' into mpp-version
solotzg Jan 6, 2023
d60efd8
47
solotzg Jan 6, 2023
6c5d44e
48
solotzg Jan 6, 2023
ef8b50f
49
solotzg Jan 6, 2023
116b101
50
solotzg Jan 9, 2023
723e944
fmt
solotzg Jan 9, 2023
5544e98
51
solotzg Jan 9, 2023
02d084f
52
solotzg Jan 9, 2023
9756882
53
solotzg Jan 9, 2023
f38459c
54
solotzg Jan 9, 2023
ea77376
55
solotzg Jan 9, 2023
c19e66d
56
solotzg Jan 10, 2023
b2bd225
57
solotzg Jan 10, 2023
9cbdd8f
Merge remote-tracking branch 'pingcap/master' into mpp-version
solotzg Jan 12, 2023
ff46a98
58
solotzg Jan 12, 2023
9bc7fa0
59
solotzg Jan 12, 2023
9a86cfc
60
solotzg Jan 12, 2023
f5ced51
61
solotzg Jan 12, 2023
b106a7d
62
solotzg Jan 12, 2023
c447c76
63
solotzg Jan 12, 2023
b9037f4
64
solotzg Jan 12, 2023
5ed30ae
65
solotzg Jan 12, 2023
dfad08b
Merge remote-tracking branch 'pingcap/master' into mpp-version
solotzg Jan 13, 2023
70cb6e6
planner: support data compression in FineGrainedShuffle (#40578)
solotzg Jan 13, 2023
f8fbefb
Merge remote-tracking branch 'pingcap/master' into mpp-version
solotzg Jan 28, 2023
ecab8d0
66
solotzg Jan 28, 2023
79abaa6
67
solotzg Jan 28, 2023
867a319
68
solotzg Jan 28, 2023
ca0c124
fix ut
solotzg Jan 28, 2023
2163b90
remove useless mppVersion member vars
solotzg Jan 29, 2023
b3f8456
change set mpp_exchange_compression_mode
solotzg Jan 29, 2023
a9f3f46
add warning when set mpp_exchange_compression_mode
solotzg Jan 29, 2023
57a6ceb
remove more useless mpp version fields
solotzg Jan 30, 2023
672287f
Merge branch 'master' into mpp-version
solotzg Jan 30, 2023
a3f1a8d
69
solotzg Jan 30, 2023
dc8c1a7
rename vars
solotzg Jan 30, 2023
4d4a7ab
Merge remote-tracking branch 'pingcap/master' into mpp-version
solotzg Jan 30, 2023
65654bf
Merge branch 'master' into mpp-version
solotzg Jan 30, 2023
b842464
70
solotzg Feb 1, 2023
dbd9d32
Merge branch 'master' into mpp-version
solotzg Feb 1, 2023
8910bc0
fix ut
solotzg Feb 1, 2023
ef7ad1f
Merge remote-tracking branch 'pingcap/master' into mpp-version
solotzg Feb 1, 2023
e5bab3b
Merge branch 'master' into mpp-version
solotzg Feb 1, 2023
3d459ed
Merge branch 'master' into mpp-version
ti-chi-bot Feb 1, 2023
c782936
Merge branch 'master' into mpp-version
ti-chi-bot Feb 1, 2023
693e0de
Merge branch 'master' into mpp-version
ti-chi-bot Feb 1, 2023
c44d24f
Merge remote-tracking branch 'pingcap/master' into mpp-version
solotzg Feb 1, 2023
d5c6785
Merge branch 'master' into mpp-version
ti-chi-bot Feb 2, 2023
100cd76
Merge branch 'master' into mpp-version
ti-chi-bot Feb 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.
ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx)
_, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
ctx = SetTiFlashMaxThreadsInContext(ctx, sctx)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs, mppQueryID)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs, mppQueryID, sctx.GetSessionVars().ChooseMppVersion())
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3401,6 +3401,7 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe
b.err = err
return nil
}

gather := &MPPGather{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
is: b.is,
Expand Down
6 changes: 5 additions & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,14 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
if err != nil {
return errors.Trace(err)
}

logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs),
zap.Int64("ID", mppTask.ID), zap.Uint64("QueryTs", mppTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", mppTask.MppQueryID.LocalQueryID),
zap.Uint64("ServerID", mppTask.MppQueryID.ServerID), zap.String("address", mppTask.Meta.GetAddress()),
zap.String("plan", plannercore.ToString(pf.ExchangeSender)))
zap.String("plan", plannercore.ToString(pf.ExchangeSender)),
zap.Int64("mpp-version", mppTask.MppVersion.ToInt64()),
zap.String("exchange-compression-mode", pf.ExchangeSender.CompressionMode.Name()),
)
req := &kv.MPPDispatchRequest{
Data: pbData,
Meta: mppTask.Meta,
Expand Down
56 changes: 56 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2071,3 +2071,59 @@ func TestSetChunkReuseVariable(t *testing.T) {
// error value
tk.MustGetErrCode("set @@tidb_enable_reuse_chunk=s;", errno.ErrWrongValueForVar)
}

func TestSetMppVersionVariable(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustQuery("select @@session.mpp_version").Check(testkit.Rows("UNSPECIFIED"))
tk.MustExec("SET SESSION mpp_version = -1")
tk.MustQuery("select @@session.mpp_version").Check(testkit.Rows("-1"))
tk.MustExec("SET SESSION mpp_version = 0")
tk.MustQuery("select @@session.mpp_version").Check(testkit.Rows("0"))
tk.MustExec("SET SESSION mpp_version = 1")
tk.MustQuery("select @@session.mpp_version").Check(testkit.Rows("1"))
tk.MustExec("SET SESSION mpp_version = unspecified")
tk.MustQuery("select @@session.mpp_version").Check(testkit.Rows("unspecified"))
{
tk.MustGetErrMsg("SET SESSION mpp_version = 2", "incorrect value: 2. mpp_version options: -1 (unspecified), 0, 1")
}
{
tk.MustExec("SET GLOBAL mpp_version = 1")
tk.MustQuery("select @@global.mpp_version").Check(testkit.Rows("1"))
tk.MustExec("SET GLOBAL mpp_version = -1")
tk.MustQuery("select @@global.mpp_version").Check(testkit.Rows("-1"))
}
}

func TestSetMppExchangeCompressionModeVariable(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustGetErrMsg(
"SET SESSION mpp_exchange_compression_mode = 123",
"incorrect value: `123`. mpp_exchange_compression_mode options: NONE, FAST, HIGH_COMPRESSION, UNSPECIFIED")
tk.MustQuery("select @@session.mpp_exchange_compression_mode").Check(testkit.Rows("UNSPECIFIED"))

tk.MustExec("SET SESSION mpp_exchange_compression_mode = none")
tk.MustQuery("select @@session.mpp_exchange_compression_mode").Check(testkit.Rows("none"))
tk.MustExec("SET SESSION mpp_exchange_compression_mode = fast")
tk.MustQuery("select @@session.mpp_exchange_compression_mode").Check(testkit.Rows("fast"))
tk.MustExec("SET SESSION mpp_exchange_compression_mode = HIGH_COMPRESSION")
tk.MustQuery("select @@session.mpp_exchange_compression_mode").Check(testkit.Rows("HIGH_COMPRESSION"))

{
tk.MustExec("SET GLOBAL mpp_exchange_compression_mode = none")
tk.MustQuery("select @@global.mpp_exchange_compression_mode").Check(testkit.Rows("none"))
}
{
tk.MustExec("SET mpp_version = 0")
tk.MustExec("SET mpp_exchange_compression_mode = unspecified")
require.Equal(t, len(tk.Session().GetSessionVars().StmtCtx.GetWarnings()), 0)
}
{
tk.MustExec("SET mpp_version = 0")
tk.MustExec("SET mpp_exchange_compression_mode = HIGH_COMPRESSION")
warnings := tk.Session().GetSessionVars().StmtCtx.GetWarnings()
require.Equal(t, len(warnings), 1)
require.Equal(t, warnings[0].Err.Error(), "mpp exchange compression won't work under current mpp version 0")
}
}
1 change: 1 addition & 0 deletions executor/tiflashtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_test(
"//config",
"//domain",
"//executor",
"//kv",
"//meta/autoid",
"//parser/terror",
"//planner/core",
Expand Down
40 changes: 36 additions & 4 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/store/mockstore"
Expand Down Expand Up @@ -628,10 +629,41 @@ func TestDispatchTaskRetry(t *testing.T) {
require.NoError(t, err)
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout", "3*return(true)"))
tk.MustQuery("select count(*) from t").Check(testkit.Rows("4"))
tk.MustQuery("select count(*) from t group by b").Check(testkit.Rows("4"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

select count(*) from t will not use mpp plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change this line to

	tk.MustQuery("explain analyze select count(*) from t").Check(testkit.Rows("4"))

we will get result

        	            	+[StreamAgg_26 1.00 1 root  time:772.4µs, loops:2 funcs:count(Column#6)->Column#3 388 Bytes N/A]
        	            	+[└─TableReader_27 1.00 1 root  time:770.5µs, loops:2, cop_task: {num: 1, max: 0s, proc_keys: 0, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15} data:StreamAgg_10 22 Bytes N/A]
        	            	+[  └─StreamAgg_10 1.00 0 batchCop[tiflash]   funcs:count(test.t.a)->Column#6 N/A N/A]
        	            	+[    └─TableFullScan_25 10000.00 0 batchCop[tiflash] table:t  keep order:false, stats:pseudo N/A N/A]

require.Nil(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout"))
}

func TestMppVersionError(t *testing.T) {
store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int not null primary key, b int not null)")
tk.MustExec("alter table t set tiflash replica 1")
tk.MustExec("insert into t values(1,0),(2,0),(3,0),(4,0)")
tb := external.GetTableByName(t, tk, "test", "t")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
{
item := fmt.Sprintf("return(%d)", kv.GetNewestMppVersion()+1)
require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/MppVersionError", item))
}
{
err := tk.QueryToErr("select count(*) from t group by b")
require.Error(t, err)
}
require.Nil(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/MppVersionError"))
{
item := fmt.Sprintf("return(%d)", kv.GetNewestMppVersion())
require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/MppVersionError", item))
}
{
tk.MustQuery("select count(*) from t group by b").Check(testkit.Rows("4"))
}
require.Nil(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/MppVersionError"))
}

func TestCancelMppTasks(t *testing.T) {
var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang"
store := testkit.CreateMockStore(t, withMockTiFlash(2))
Expand Down Expand Up @@ -1319,15 +1351,15 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) {
require.NoError(t, err)
tk.MustQuery("explain select * from t1 where c1 < 2").Check(testkit.Rows(
"PartitionUnion_10 9970.00 root ",
"├─TableReader_15 3323.33 root data:ExchangeSender_14",
"├─TableReader_15 3323.33 root MppVersion: 1, data:ExchangeSender_14",
"│ └─ExchangeSender_14 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_13 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_12 10000.00 mpp[tiflash] table:t1, partition:p0 keep order:false, stats:pseudo",
"├─TableReader_19 3323.33 root data:ExchangeSender_18",
"├─TableReader_19 3323.33 root MppVersion: 1, data:ExchangeSender_18",
"│ └─ExchangeSender_18 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_17 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_16 10000.00 mpp[tiflash] table:t1, partition:p1 keep order:false, stats:pseudo",
"└─TableReader_23 3323.33 root data:ExchangeSender_22",
"└─TableReader_23 3323.33 root MppVersion: 1, data:ExchangeSender_22",
" └─ExchangeSender_22 3323.33 mpp[tiflash] ExchangeType: PassThrough",
" └─Selection_21 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
" └─TableFullScan_20 10000.00 mpp[tiflash] table:t1, partition:p2 keep order:false, stats:pseudo"))
Expand Down
1 change: 1 addition & 0 deletions kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ go_test(
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/deadlock",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
114 changes: 111 additions & 3 deletions kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,65 @@ package kv

import (
"context"
"strconv"
"strings"
"time"

"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/tipb/go-tipb"
)

// MppVersion indicates the mpp-version used to build mpp plan
type MppVersion int64

const (
// MppVersionV0 supports TiFlash version [~, ~]
MppVersionV0 MppVersion = iota

// MppVersionV1 supports TiFlash version [v6.6.x, ~]
MppVersionV1

// MppVersionV2
// MppVersionV3

mppVersionMax

newestMppVersion MppVersion = mppVersionMax - 1

// MppVersionUnspecified means the illegal or unspecified version, it only used in TiDB.
MppVersionUnspecified MppVersion = -1

// MppVersionUnspecifiedName denotes name of UNSPECIFIED mpp version
MppVersionUnspecifiedName string = "UNSPECIFIED"
)

// ToInt64 transforms MppVersion to int64
func (v MppVersion) ToInt64() int64 {
return int64(v)
}

// ToMppVersion transforms string to MppVersion
func ToMppVersion(name string) (MppVersion, bool) {
name = strings.ToUpper(name)
if name == MppVersionUnspecifiedName {
return MppVersionUnspecified, true
}
v, err := strconv.ParseInt(name, 10, 64)
if err != nil {
return MppVersionUnspecified, false
}
version := MppVersion(v)
if version >= MppVersionUnspecified && version <= newestMppVersion {
return version, true
}
return MppVersionUnspecified, false
}

// GetNewestMppVersion returns the mpp-version can be used in mpp plan
func GetNewestMppVersion() MppVersion {
return newestMppVersion
}

// MPPTaskMeta means the meta info such as location of a mpp task.
type MPPTaskMeta interface {
// GetAddress indicates which node this task should execute on.
Expand All @@ -40,7 +94,8 @@ type MPPTask struct {
ID int64 // mppTaskID
StartTs uint64
MppQueryID MPPQueryID
TableID int64 // physical table id
TableID int64 // physical table id
MppVersion MppVersion // mpp version

PartitionTableIDs []int64
IsDisaggregatedTiFlashStaticPrune bool
Expand All @@ -54,6 +109,7 @@ func (t *MPPTask) ToPB() *mpp.TaskMeta {
LocalQueryId: t.MppQueryID.LocalQueryID,
ServerId: t.MppQueryID.ServerID,
TaskId: t.ID,
MppVersion: t.MppVersion.ToInt64(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support MppVersionUnspecified in TiFlash?

Copy link
Contributor Author

@solotzg solotzg Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, unspecified mpp version(-1) only used in tidb. Func ChooseMppVersion will select newest mpp version if unspecified.

}
if t.ID != -1 {
meta.Address = t.Meta.GetAddress()
Expand Down Expand Up @@ -94,9 +150,8 @@ type MPPClient interface {
// ConstructMPPTasks schedules task for a plan fragment.
// TODO:: This interface will be refined after we support more executors.
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID MPPQueryID) Response
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID MPPQueryID, mppVersion MppVersion) Response
}

// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
Expand All @@ -107,3 +162,56 @@ type MPPBuildTasksRequest struct {

PartitionIDAndRanges []PartitionIDAndRanges
}

// ExchangeCompressionMode means the compress method used in exchange operator
type ExchangeCompressionMode int

const (
// ExchangeCompressionModeNONE indicates no compression
ExchangeCompressionModeNONE ExchangeCompressionMode = iota
// ExchangeCompressionModeFast indicates fast compression/decompression speed, compression ratio is lower than HC mode
ExchangeCompressionModeFast
// ExchangeCompressionModeHC indicates high compression (HC) ratio mode
ExchangeCompressionModeHC
// ExchangeCompressionModeUnspecified indicates unspecified compress method, let TiDB choose one
ExchangeCompressionModeUnspecified

// RecommendedExchangeCompressionMode indicates recommended compression mode
RecommendedExchangeCompressionMode ExchangeCompressionMode = ExchangeCompressionModeFast

exchangeCompressionModeUnspecifiedName string = "UNSPECIFIED"
)

// Name returns the name of ExchangeCompressionMode
func (t ExchangeCompressionMode) Name() string {
if t == ExchangeCompressionModeUnspecified {
return exchangeCompressionModeUnspecifiedName
}
return t.ToTipbCompressionMode().String()
}

// ToExchangeCompressionMode returns the ExchangeCompressionMode from name
func ToExchangeCompressionMode(name string) (ExchangeCompressionMode, bool) {
name = strings.ToUpper(name)
if name == exchangeCompressionModeUnspecifiedName {
return ExchangeCompressionModeUnspecified, true
}
value, ok := tipb.CompressionMode_value[name]
if ok {
return ExchangeCompressionMode(value), true
}
return ExchangeCompressionModeNONE, false
}

// ToTipbCompressionMode returns tipb.CompressionMode from kv.ExchangeCompressionMode
func (t ExchangeCompressionMode) ToTipbCompressionMode() tipb.CompressionMode {
switch t {
case ExchangeCompressionModeNONE:
return tipb.CompressionMode_NONE
case ExchangeCompressionModeFast:
return tipb.CompressionMode_FAST
case ExchangeCompressionModeHC:
return tipb.CompressionMode_HIGH_COMPRESSION
}
return tipb.CompressionMode_NONE
}
55 changes: 55 additions & 0 deletions kv/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kv
import (
"testing"

"github.com/pingcap/tipb/go-tipb"
"github.com/stretchr/testify/assert"
)

Expand All @@ -30,3 +31,57 @@ func TestVersion(t *testing.T) {
assert.True(t, eq == 0)
assert.True(t, MinVersion.Cmp(MaxVersion) < 0)
}

func TestMppVersion(t *testing.T) {
assert.Equal(t, int64(1), GetNewestMppVersion().ToInt64())
{
v, ok := ToMppVersion("unspecified")
assert.True(t, ok)
assert.Equal(t, v, MppVersionUnspecified)
}
{
v, ok := ToMppVersion("-1")
assert.True(t, ok)
assert.Equal(t, v, MppVersionUnspecified)
}
{
v, ok := ToMppVersion("0")
assert.True(t, ok)
assert.Equal(t, v, MppVersionV0)
}
{
v, ok := ToMppVersion("1")
assert.True(t, ok)
assert.Equal(t, v, MppVersionV1)
}
}

func TestExchangeCompressionMode(t *testing.T) {
assert.Equal(t, "UNSPECIFIED", ExchangeCompressionModeUnspecified.Name())
{
a, ok := ToExchangeCompressionMode("UNSPECIFIED")
assert.Equal(t, a, ExchangeCompressionModeUnspecified)
assert.True(t, ok)
}
assert.Equal(t, "NONE", ExchangeCompressionModeNONE.Name())
{
a, ok := ToExchangeCompressionMode("NONE")
assert.Equal(t, a, ExchangeCompressionModeNONE)
assert.True(t, ok)
}
assert.Equal(t, "FAST", ExchangeCompressionModeFast.Name())
{
a, ok := ToExchangeCompressionMode("FAST")
assert.Equal(t, a, ExchangeCompressionModeFast)
assert.True(t, ok)
}
assert.Equal(t, "HIGH_COMPRESSION", ExchangeCompressionModeHC.Name())
{
a, ok := ToExchangeCompressionMode("HIGH_COMPRESSION")
assert.Equal(t, a, ExchangeCompressionModeHC)
assert.True(t, ok)
}
// default `FAST`
assert.Equal(t, ExchangeCompressionModeFast, RecommendedExchangeCompressionMode)
assert.Equal(t, tipb.CompressionMode_FAST, RecommendedExchangeCompressionMode.ToTipbCompressionMode())
}
Loading