-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
protocol: support query attribute since mysql 8.0.23 #55175
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -21,6 +21,7 @@ import ( | |||||||
|
||||||||
"github.com/pingcap/errors" | ||||||||
"github.com/pingcap/tidb/pkg/expression/expropt" | ||||||||
"github.com/pingcap/tidb/pkg/param" | ||||||||
"github.com/pingcap/tidb/pkg/parser/ast" | ||||||||
"github.com/pingcap/tidb/pkg/parser/model" | ||||||||
"github.com/pingcap/tidb/pkg/parser/mysql" | ||||||||
|
@@ -46,6 +47,7 @@ var ( | |||||||
_ functionClass = &valuesFunctionClass{} | ||||||||
_ functionClass = &bitCountFunctionClass{} | ||||||||
_ functionClass = &getParamFunctionClass{} | ||||||||
_ functionClass = &getQueryAttrFunctionClass{} | ||||||||
) | ||||||||
|
||||||||
var ( | ||||||||
|
@@ -1676,3 +1678,60 @@ func (b *builtinGetParamStringSig) evalString(ctx EvalContext, row chunk.Row) (s | |||||||
} | ||||||||
return str, false, nil | ||||||||
} | ||||||||
|
||||||||
// getQueryAttrFunctionClass for plan cache of prepared statements | ||||||||
type getQueryAttrFunctionClass struct { | ||||||||
baseFunctionClass | ||||||||
} | ||||||||
|
||||||||
func (c *getQueryAttrFunctionClass) getFunction(ctx BuildContext, args []Expression) (builtinFunc, error) { | ||||||||
if err := c.verifyArgs(args); err != nil { | ||||||||
return nil, err | ||||||||
} | ||||||||
bf, err := newBaseBuiltinFuncWithTp(ctx, c.funcName, args, types.ETString, types.ETString) | ||||||||
if err != nil { | ||||||||
return nil, err | ||||||||
} | ||||||||
bf.tp.SetFlen(mysql.MaxFieldVarCharLength) | ||||||||
sig := &builtinGetQueryAttrStringSig{baseBuiltinFunc: bf} | ||||||||
return sig, nil | ||||||||
} | ||||||||
|
||||||||
type builtinGetQueryAttrStringSig struct { | ||||||||
baseBuiltinFunc | ||||||||
expropt.SessionVarsPropReader | ||||||||
} | ||||||||
|
||||||||
func (b *builtinGetQueryAttrStringSig) Clone() builtinFunc { | ||||||||
newSig := &builtinGetQueryAttrStringSig{} | ||||||||
newSig.cloneFrom(&b.baseBuiltinFunc) | ||||||||
return newSig | ||||||||
} | ||||||||
|
||||||||
func (b *builtinGetQueryAttrStringSig) RequiredOptionalEvalProps() OptionalEvalPropKeySet { | ||||||||
return b.SessionVarsPropReader.RequiredOptionalEvalProps() | ||||||||
} | ||||||||
|
||||||||
func (b *builtinGetQueryAttrStringSig) evalString(ctx EvalContext, row chunk.Row) (string, bool, error) { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Often people know the function name and then have to fine the location of the code. This might help to make it easier to find. |
||||||||
sessionVars, err := b.GetSessionVars(ctx) | ||||||||
if err != nil { | ||||||||
return "", true, err | ||||||||
} | ||||||||
|
||||||||
varName, isNull, err := b.args[0].EvalString(ctx, row) | ||||||||
if isNull || err != nil { | ||||||||
return "", true, err | ||||||||
} | ||||||||
attrs := sessionVars.QueryAttributes | ||||||||
if attrs == nil { | ||||||||
return "", true, nil | ||||||||
} | ||||||||
if v, ok := attrs[varName]; ok { | ||||||||
paramData, err := ExecBinaryParam(sessionVars.StmtCtx.TypeCtx(), []param.BinaryParam{v}) | ||||||||
if err != nil { | ||||||||
return "", true, err | ||||||||
} | ||||||||
return paramData[0].EvalString(ctx, row) | ||||||||
} | ||||||||
return "", true, nil | ||||||||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -23,7 +23,7 @@ import ( | |||||
var ErrUnknownFieldType = dbterror.ClassServer.NewStd(errno.ErrUnknownFieldType) | ||||||
|
||||||
// BinaryParam stores the information decoded from the binary protocol | ||||||
// It can be further parsed into `expression.Expression` through the `ExecArgs` function in this package | ||||||
// It can be further parsed into `expression.Expression` through the expression.ExecBinaryParam function in expression package | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
type BinaryParam struct { | ||||||
Tp byte | ||||||
IsUnsigned bool | ||||||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -67,6 +67,7 @@ import ( | |||||||||
"github.com/pingcap/tidb/pkg/kv" | ||||||||||
"github.com/pingcap/tidb/pkg/meta/model" | ||||||||||
"github.com/pingcap/tidb/pkg/metrics" | ||||||||||
"github.com/pingcap/tidb/pkg/param" | ||||||||||
"github.com/pingcap/tidb/pkg/parser" | ||||||||||
"github.com/pingcap/tidb/pkg/parser/ast" | ||||||||||
"github.com/pingcap/tidb/pkg/parser/auth" | ||||||||||
|
@@ -1337,6 +1338,7 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error { | |||||||||
|
||||||||||
cc.server.releaseToken(token) | ||||||||||
cc.lastActive = time.Now() | ||||||||||
cc.ctx.GetSessionVars().QueryAttributes = nil | ||||||||||
}() | ||||||||||
|
||||||||||
vars := cc.ctx.GetSessionVars() | ||||||||||
|
@@ -1373,8 +1375,14 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error { | |||||||||
// See http://dev.mysql.com/doc/internals/en/com-query.html | ||||||||||
if len(data) > 0 && data[len(data)-1] == 0 { | ||||||||||
data = data[:len(data)-1] | ||||||||||
dataStr = string(hack.String(data)) | ||||||||||
} | ||||||||||
pos, err := cc.parseQueryAttributes(ctx, data) | ||||||||||
if err != nil { | ||||||||||
return err | ||||||||||
} | ||||||||||
// fix lastPacket for display/log | ||||||||||
cc.lastPacket = append([]byte{cc.lastPacket[0]}, data[pos:]...) | ||||||||||
dataStr = string(hack.String(data[pos:])) | ||||||||||
return cc.handleQuery(ctx, dataStr) | ||||||||||
case mysql.ComFieldList: | ||||||||||
return cc.handleFieldList(ctx, dataStr) | ||||||||||
|
@@ -1698,6 +1706,61 @@ func (cc *clientConn) audit(eventType plugin.GeneralEvent) { | |||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
// parseQueryAttributes support query attributes since mysql 8.0.23 | ||||||||||
// see https://dev.mysql.com/doc/refman/8.0/en/query-attributes.html | ||||||||||
// https://archive.fosdem.org/2021/schedule/event/mysql_protocl/attachments/slides/4274/export/events/attachments/mysql_protocl/slides/4274/FOSDEM21_MySQL_Protocols_Query_Attributes.pdf | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
func (cc *clientConn) parseQueryAttributes(ctx context.Context, data []byte) (pos int, err error) { | ||||||||||
if cc.capability&mysql.ClientQueryAttributes > 0 { | ||||||||||
paraCount, _, np := util2.ParseLengthEncodedInt(data) | ||||||||||
numParams := int(paraCount) | ||||||||||
pos += np | ||||||||||
_, _, np = util2.ParseLengthEncodedInt(data[pos:]) | ||||||||||
pos += np | ||||||||||
ps := make([]param.BinaryParam, numParams) | ||||||||||
names := make([]string, numParams) | ||||||||||
if paraCount > 0 { | ||||||||||
var ( | ||||||||||
nullBitmaps []byte | ||||||||||
paramTypes []byte | ||||||||||
) | ||||||||||
cc.initInputEncoder(ctx) | ||||||||||
nullBitmapLen := (numParams + 7) >> 3 | ||||||||||
nullBitmaps = data[pos : pos+nullBitmapLen] | ||||||||||
pos += nullBitmapLen | ||||||||||
if data[pos] != 1 { | ||||||||||
return 0, mysql.ErrMalformPacket | ||||||||||
} | ||||||||||
|
||||||||||
pos++ | ||||||||||
for i := 0; i < numParams; i++ { | ||||||||||
paramTypes = append(paramTypes, data[pos:pos+2]...) | ||||||||||
pos += 2 | ||||||||||
s, _, p, e := util2.ParseLengthEncodedBytes(data[pos:]) | ||||||||||
if e != nil { | ||||||||||
return 0, mysql.ErrMalformPacket | ||||||||||
} | ||||||||||
names[i] = string(hack.String(s)) | ||||||||||
pos += p | ||||||||||
} | ||||||||||
|
||||||||||
boundParams := make([][]byte, numParams) | ||||||||||
p := 0 | ||||||||||
if p, err = parseBinaryParams(ps, boundParams, nullBitmaps, paramTypes, data[pos:], cc.inputDecoder); err != nil { | ||||||||||
return | ||||||||||
} | ||||||||||
|
||||||||||
pos += p | ||||||||||
psWithName := make(map[string]param.BinaryParam, numParams) | ||||||||||
for i := range names { | ||||||||||
psWithName[names[i]] = ps[i] | ||||||||||
} | ||||||||||
cc.ctx.GetSessionVars().QueryAttributes = psWithName | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
return | ||||||||||
} | ||||||||||
|
||||||||||
// handleQuery executes the sql query string and writes result set or result ok to the client. | ||||||||||
// As the execution time of this function represents the performance of TiDB, we do time log and metrics here. | ||||||||||
// Some special queries like `load data` that does not return result, which is handled in handleFileTransInConn. | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,7 @@ import ( | |
"github.com/pingcap/tidb/pkg/server/internal/dump" | ||
"github.com/pingcap/tidb/pkg/server/internal/parse" | ||
"github.com/pingcap/tidb/pkg/server/internal/resultset" | ||
util2 "github.com/pingcap/tidb/pkg/server/internal/util" | ||
"github.com/pingcap/tidb/pkg/sessionctx/variable" | ||
"github.com/pingcap/tidb/pkg/sessiontxn" | ||
storeerr "github.com/pingcap/tidb/pkg/store/driver/error" | ||
|
@@ -180,6 +181,13 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e | |
) | ||
cc.initInputEncoder(ctx) | ||
numParams := stmt.NumParams() | ||
clientHasQueryAttr := cc.capability&mysql.ClientQueryAttributes > 0 | ||
if clientHasQueryAttr && (numParams > 0 || flag&mysql.ParameterCountAvailable > 0) { | ||
paraCount, _, np := util2.ParseLengthEncodedInt(data[pos:]) | ||
numParams = int(paraCount) | ||
pos += np | ||
} | ||
|
||
args := make([]param.BinaryParam, numParams) | ||
if numParams > 0 { | ||
nullBitmapLen := (numParams + 7) >> 3 | ||
|
@@ -192,12 +200,28 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e | |
// new param bound flag | ||
if data[pos] == 1 { | ||
pos++ | ||
if len(data) < (pos + (numParams << 1)) { | ||
return mysql.ErrMalformPacket | ||
// For client that has query attribute ability, parameter's name will also be sent. | ||
// However, it is useless for execute statement, so we ignore it here. | ||
if clientHasQueryAttr { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not think set |
||
for i := 0; i < numParams; i++ { | ||
paramTypes = append(paramTypes, data[pos:pos+2]...) | ||
pos += 2 | ||
// parse names | ||
_, _, p, e := util2.ParseLengthEncodedBytes(data[pos:]) | ||
if e != nil { | ||
return mysql.ErrMalformPacket | ||
} | ||
pos += p | ||
} | ||
} else { | ||
if len(data) < (pos + (numParams << 1)) { | ||
return mysql.ErrMalformPacket | ||
} | ||
|
||
paramTypes = data[pos : pos+(numParams<<1)] | ||
pos += numParams << 1 | ||
} | ||
|
||
paramTypes = data[pos : pos+(numParams<<1)] | ||
pos += numParams << 1 | ||
paramValues = data[pos:] | ||
// Just the first StmtExecute packet contain parameters type, | ||
// we need save it for further use. | ||
|
@@ -206,7 +230,7 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e | |
paramValues = data[pos+1:] | ||
} | ||
|
||
err = parseBinaryParams(args, stmt.BoundParams(), nullBitmaps, stmt.GetParamsType(), paramValues, cc.inputDecoder) | ||
_, err = parseBinaryParams(args, stmt.BoundParams(), nullBitmaps, stmt.GetParamsType(), paramValues, cc.inputDecoder) | ||
// This `.Reset` resets the arguments, so it's fine to just ignore the error (and the it'll be reset again in the following routine) | ||
errReset := stmt.Reset() | ||
if errReset != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this belongs under "TiDB Sequence function." Maybe think of a good name or leave it out if it is obvious.