-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
planner: add MppVersion for mpp task; support data compression in Exchange Operator; #40132
Changes from all commits
7d31a46
5f306af
69c5276
880f15f
82c0c9b
62dcad7
9269726
5ea73ec
8f9bba1
5a907e3
96116d6
18687bb
31c627d
90ffb98
0940576
e77a626
3d367f5
79d5674
f462d70
dcd1e06
99da3db
0a80fdd
e3b1acb
2985825
aa5e11f
dd0b8f6
704e4d6
6b0a6b1
587f73c
0c7c08f
a7bce2d
c7bc5bc
6069e3f
30fb373
35cbcb5
03a7d2d
898a135
4cc0e9e
384c450
ee5d646
a2e65e2
d72b039
144eace
679cd20
aa82fde
f83416e
748269e
b66c56f
6dd1cfd
76bd03e
d60efd8
6c5d44e
ef8b50f
116b101
723e944
5544e98
02d084f
9756882
f38459c
ea77376
c19e66d
b2bd225
9cbdd8f
ff46a98
9bc7fa0
9a86cfc
f5ced51
b106a7d
c447c76
b9037f4
5ed30ae
dfad08b
70cb6e6
f8fbefb
ecab8d0
79abaa6
867a319
ca0c124
2163b90
b3f8456
a9f3f46
57a6ceb
672287f
a3f1a8d
dc8c1a7
4d4a7ab
65654bf
b842464
dbd9d32
8910bc0
ef7ad1f
e5bab3b
3d459ed
c782936
693e0de
c44d24f
d5c6785
100cd76
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,11 +16,65 @@ package kv | |
|
||
import ( | ||
"context" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"github.com/pingcap/kvproto/pkg/mpp" | ||
"github.com/pingcap/tipb/go-tipb" | ||
) | ||
|
||
// MppVersion indicates the mpp-version used to build mpp plan | ||
type MppVersion int64 | ||
|
||
const ( | ||
// MppVersionV0 supports TiFlash version [~, ~] | ||
MppVersionV0 MppVersion = iota | ||
|
||
// MppVersionV1 supports TiFlash version [v6.6.x, ~] | ||
MppVersionV1 | ||
|
||
// MppVersionV2 | ||
// MppVersionV3 | ||
|
||
mppVersionMax | ||
|
||
newestMppVersion MppVersion = mppVersionMax - 1 | ||
|
||
// MppVersionUnspecified means the illegal or unspecified version, it only used in TiDB. | ||
MppVersionUnspecified MppVersion = -1 | ||
|
||
// MppVersionUnspecifiedName denotes name of UNSPECIFIED mpp version | ||
MppVersionUnspecifiedName string = "UNSPECIFIED" | ||
) | ||
|
||
// ToInt64 transforms MppVersion to int64 | ||
func (v MppVersion) ToInt64() int64 { | ||
return int64(v) | ||
} | ||
|
||
// ToMppVersion transforms string to MppVersion | ||
func ToMppVersion(name string) (MppVersion, bool) { | ||
name = strings.ToUpper(name) | ||
if name == MppVersionUnspecifiedName { | ||
return MppVersionUnspecified, true | ||
} | ||
v, err := strconv.ParseInt(name, 10, 64) | ||
if err != nil { | ||
return MppVersionUnspecified, false | ||
} | ||
version := MppVersion(v) | ||
if version >= MppVersionUnspecified && version <= newestMppVersion { | ||
return version, true | ||
} | ||
return MppVersionUnspecified, false | ||
} | ||
|
||
// GetNewestMppVersion returns the mpp-version can be used in mpp plan | ||
func GetNewestMppVersion() MppVersion { | ||
return newestMppVersion | ||
} | ||
|
||
// MPPTaskMeta means the meta info such as location of a mpp task. | ||
type MPPTaskMeta interface { | ||
// GetAddress indicates which node this task should execute on. | ||
|
@@ -40,7 +94,8 @@ type MPPTask struct { | |
ID int64 // mppTaskID | ||
StartTs uint64 | ||
MppQueryID MPPQueryID | ||
TableID int64 // physical table id | ||
TableID int64 // physical table id | ||
MppVersion MppVersion // mpp version | ||
|
||
PartitionTableIDs []int64 | ||
IsDisaggregatedTiFlashStaticPrune bool | ||
|
@@ -54,6 +109,7 @@ func (t *MPPTask) ToPB() *mpp.TaskMeta { | |
LocalQueryId: t.MppQueryID.LocalQueryID, | ||
ServerId: t.MppQueryID.ServerID, | ||
TaskId: t.ID, | ||
MppVersion: t.MppVersion.ToInt64(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we support There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, unspecified mpp version(-1) only used in tidb. Func |
||
} | ||
if t.ID != -1 { | ||
meta.Address = t.Meta.GetAddress() | ||
|
@@ -94,9 +150,8 @@ type MPPClient interface { | |
// ConstructMPPTasks schedules task for a plan fragment. | ||
// TODO:: This interface will be refined after we support more executors. | ||
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration) ([]MPPTaskMeta, error) | ||
|
||
// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data. | ||
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID MPPQueryID) Response | ||
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID MPPQueryID, mppVersion MppVersion) Response | ||
} | ||
|
||
// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment. | ||
|
@@ -107,3 +162,56 @@ type MPPBuildTasksRequest struct { | |
|
||
PartitionIDAndRanges []PartitionIDAndRanges | ||
} | ||
|
||
// ExchangeCompressionMode means the compress method used in exchange operator | ||
type ExchangeCompressionMode int | ||
|
||
const ( | ||
// ExchangeCompressionModeNONE indicates no compression | ||
ExchangeCompressionModeNONE ExchangeCompressionMode = iota | ||
// ExchangeCompressionModeFast indicates fast compression/decompression speed, compression ratio is lower than HC mode | ||
ExchangeCompressionModeFast | ||
// ExchangeCompressionModeHC indicates high compression (HC) ratio mode | ||
ExchangeCompressionModeHC | ||
// ExchangeCompressionModeUnspecified indicates unspecified compress method, let TiDB choose one | ||
ExchangeCompressionModeUnspecified | ||
|
||
// RecommendedExchangeCompressionMode indicates recommended compression mode | ||
RecommendedExchangeCompressionMode ExchangeCompressionMode = ExchangeCompressionModeFast | ||
|
||
exchangeCompressionModeUnspecifiedName string = "UNSPECIFIED" | ||
) | ||
|
||
// Name returns the name of ExchangeCompressionMode | ||
func (t ExchangeCompressionMode) Name() string { | ||
if t == ExchangeCompressionModeUnspecified { | ||
return exchangeCompressionModeUnspecifiedName | ||
} | ||
return t.ToTipbCompressionMode().String() | ||
} | ||
|
||
// ToExchangeCompressionMode returns the ExchangeCompressionMode from name | ||
func ToExchangeCompressionMode(name string) (ExchangeCompressionMode, bool) { | ||
name = strings.ToUpper(name) | ||
if name == exchangeCompressionModeUnspecifiedName { | ||
return ExchangeCompressionModeUnspecified, true | ||
} | ||
value, ok := tipb.CompressionMode_value[name] | ||
if ok { | ||
return ExchangeCompressionMode(value), true | ||
} | ||
return ExchangeCompressionModeNONE, false | ||
} | ||
|
||
// ToTipbCompressionMode returns tipb.CompressionMode from kv.ExchangeCompressionMode | ||
func (t ExchangeCompressionMode) ToTipbCompressionMode() tipb.CompressionMode { | ||
switch t { | ||
case ExchangeCompressionModeNONE: | ||
return tipb.CompressionMode_NONE | ||
case ExchangeCompressionModeFast: | ||
return tipb.CompressionMode_FAST | ||
case ExchangeCompressionModeHC: | ||
return tipb.CompressionMode_HIGH_COMPRESSION | ||
} | ||
return tipb.CompressionMode_NONE | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
select count(*) from t
will not use mpp plan.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change this line to
we will get result