Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Prepared Statements #5018

Merged
merged 43 commits into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
30cda05
support for the ComPrepare
dcadevil Jan 15, 2019
91f73b1
support for the MySQL prepare command protocol
dcadevil Jan 16, 2019
85f6d74
add missing function implementation
dcadevil Jan 16, 2019
6024566
delete unused code
dcadevil Jan 16, 2019
25d1643
add missing function implementation
dcadevil Jan 16, 2019
9f0e7c3
add missing function implementation
dcadevil Jan 16, 2019
24d0dd0
add comments
dcadevil Jan 17, 2019
2393ff7
Merge branch 'master' into prepare
dcadevil Apr 1, 2019
ed00790
Merge branch 'prepare' of git://github.com/tiglabs/vitess into tiglab…
deepthi Jun 17, 2019
82c5e90
fix compile errors after merge
deepthi Jun 17, 2019
24ea99f
Merge remote-tracking branch 'origin/master' into tiglabs-prepare
saifalharthi Jun 28, 2019
9ceb1f4
Merge remote-tracking branch 'origin/master' into tiglabs-prepare
saifalharthi Jun 30, 2019
e523677
Merge remote-tracking branch 'origin/master' into tiglabs-prepare
saifalharthi Jul 10, 2019
8d641ee
Added test for ComPrepare
saifalharthi Jul 15, 2019
96ed068
added few more tests in query_test
saifalharthi Jul 15, 2019
70096dd
Merge remote-tracking branch 'origin/master' into tiglabs-prepare
saifalharthi Jul 16, 2019
1470f76
Added executor tests. They are faulty now.
saifalharthi Jul 17, 2019
698243b
Added python end to end test
saifalharthi Jul 20, 2019
0d39607
Fix error message
saifalharthi Jul 20, 2019
bc1ac07
Added test for TestComStmtExecute
saifalharthi Jul 22, 2019
b72f38b
Merge remote-tracking branch 'origin/master' into tiglabs-prepare
saifalharthi Jul 22, 2019
d278419
Add Executor test for DML
saifalharthi Jul 22, 2019
69729e9
Added Excutor test for select
saifalharthi Jul 22, 2019
a51fabc
Added Executor test for select
saifalharthi Jul 22, 2019
387cb98
Merge branch 'tiglabs-prepare' of github.com:planetscale/vitess into …
saifalharthi Jul 22, 2019
ed69725
Added mysql-connector dependency
saifalharthi Jul 23, 2019
c55be51
Added dependency to bootstrap.sh and edited python test
saifalharthi Jul 23, 2019
85a4f18
Updated TestComPrepare test
saifalharthi Jul 24, 2019
6a01fe0
Replaced etcd2 with zk for end to end tests
saifalharthi Jul 24, 2019
611820a
Set correct value for topo-server
saifalharthi Jul 24, 2019
786f7d8
Fix TestTypeError
saifalharthi Jul 24, 2019
04c1100
Edit utils.py
saifalharthi Jul 24, 2019
5eb5830
Addressed partial comments
saifalharthi Jul 30, 2019
017cf57
Quick fix
saifalharthi Jul 30, 2019
03217bd
Document test
saifalharthi Aug 1, 2019
a138a40
Merge remote-tracking branch 'origin/master' into tiglabs-prepare
saifalharthi Aug 1, 2019
0fedf79
Fix Tests
saifalharthi Aug 7, 2019
3dc1bf5
Merge remote-tracking branch 'origin/master' into tiglabs-prepare
saifalharthi Aug 7, 2019
3f67a4a
Fix mount path
saifalharthi Aug 7, 2019
d7fb060
Merge remote-tracking branch 'origin/master' into tiglabs-prepare
saifalharthi Aug 9, 2019
58a810f
Fixed tests and bug in ComPrepare
saifalharthi Aug 9, 2019
d90e723
Address comments
saifalharthi Aug 9, 2019
112187e
Added end to end test fail ComPrepare and make sure other queries do …
saifalharthi Aug 9, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ function install_grpc() {
PIP=$grpc_virtualenv/bin/pip
$PIP install --upgrade pip
$PIP install --upgrade --ignore-installed virtualenv
$PIP install mysql-connector-python

grpcio_ver=$version
$PIP install --upgrade grpcio=="$grpcio_ver" grpcio-tools=="$grpcio_ver"
Expand Down Expand Up @@ -360,6 +361,8 @@ if [ "$BUILD_TESTS" == 1 ] ; then
echo "$MYSQL_FLAVOR" > "$VTROOT/dist/MYSQL_FLAVOR"
fi

PYTHONPATH='' $PIP install mysql-connector-python

#
# 4. Installation of development related steps e.g. creating Git hooks.
#
Expand Down
248 changes: 243 additions & 5 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,22 @@ type Conn struct {
// currentEphemeralBuffer for tracking allocated temporary buffer for writes and reads respectively.
// It can be allocated from bufPool or heap and should be recycled in the same manner.
currentEphemeralBuffer *[]byte

// StatementID is the prepared statement ID.
StatementID uint32

// PrepareData is the map to use a prepared statement.
PrepareData map[uint32]*PrepareData
}

// PrepareData is a buffer used for store prepare statement meta data
type PrepareData struct {
StatementID uint32
PrepareStmt string
ParamsCount uint16
ParamsType []int32
ColumnNames []string
BindVars map[string]*querypb.BindVariable
}

// bufPool is used to allocate and free buffers in an efficient way.
Expand All @@ -182,9 +198,10 @@ func newConn(conn net.Conn) *Conn {
// size for reads.
func newServerConn(conn net.Conn, listener *Listener) *Conn {
c := &Conn{
conn: conn,
listener: listener,
closed: sync2.NewAtomicBool(false),
conn: conn,
listener: listener,
closed: sync2.NewAtomicBool(false),
PrepareData: make(map[uint32]*PrepareData),
}
if listener.connReadBufferSize > 0 {
c.bufferedReader = bufio.NewReaderSize(conn, listener.connReadBufferSize)
Expand Down Expand Up @@ -801,6 +818,226 @@ func (c *Conn) handleNextCommand(handler Handler) error {
return err
}
}
case ComPrepare:
query := c.parseComPrepare(data)
c.recycleReadPacket()

var queries []string
if c.Capabilities&CapabilityClientMultiStatements != 0 {
queries, err = sqlparser.SplitStatementToPieces(query)
if err != nil {
log.Errorf("Conn %v: Error splitting query: %v", c, err)
if werr := c.writeErrorPacketFromError(err); werr != nil {
// If we can't even write the error, we're done.
log.Errorf("Conn %v: Error writing query error: %v", c, werr)
return werr
}
}
} else {
queries = []string{query}
}

if len(queries) != 1 {
return fmt.Errorf("can not prepare multiple statements")
}

// Popoulate PrepareData
c.StatementID++
prepare := &PrepareData{
StatementID: c.StatementID,
PrepareStmt: queries[0],
saifalharthi marked this conversation as resolved.
Show resolved Hide resolved
}

statement, err := sqlparser.ParseStrictDDL(query)
if err != nil {
return err
}

paramsCount := uint16(0)
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (bool, error) {
switch node := node.(type) {
case *sqlparser.SQLVal:
if strings.HasPrefix(string(node.Val), ":v") {
paramsCount++
}
}
return true, nil
}, statement)

if paramsCount > 0 {
prepare.ParamsCount = paramsCount
prepare.ParamsType = make([]int32, paramsCount)
prepare.BindVars = make(map[string]*querypb.BindVariable, paramsCount)
}

c.PrepareData[c.StatementID] = prepare

fld, err := handler.ComPrepare(c, queries[0])

if err != nil {
if werr := c.writeErrorPacketFromError(err); werr != nil {
// If we can't even write the error, we're done.
log.Error("Error writing query error to client %v: %v", c.ConnectionID, werr)
return werr
}
saifalharthi marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

if err := c.writePrepare(fld, c.PrepareData[c.StatementID]); err != nil {
return err
}

case ComStmtExecute:
queryStart := time.Now()
stmtID, _, err := c.parseComStmtExecute(c.PrepareData, data)
c.recycleReadPacket()

if stmtID != uint32(0) {
defer func() {
prepare := c.PrepareData[stmtID]
if prepare.BindVars != nil {
saifalharthi marked this conversation as resolved.
Show resolved Hide resolved
for k := range prepare.BindVars {
prepare.BindVars[k] = nil
}
}
}()
}

if err != nil {
if werr := c.writeErrorPacketFromError(err); werr != nil {
// If we can't even write the error, we're done.
log.Error("Error writing query error to client %v: %v", c.ConnectionID, werr)
return werr
}
return nil
}

fieldSent := false
// sendFinished is set if the response should just be an OK packet.
sendFinished := false
prepare := c.PrepareData[stmtID]
err = handler.ComStmtExecute(c, prepare, func(qr *sqltypes.Result) error {
if sendFinished {
// Failsafe: Unreachable if server is well-behaved.
return io.EOF
}

if !fieldSent {
fieldSent = true

if len(qr.Fields) == 0 {
sendFinished = true
// We should not send any more packets after this.
return c.writeOKPacket(qr.RowsAffected, qr.InsertID, c.StatusFlags, 0)
}
if err := c.writeFields(qr); err != nil {
return err
}
}

return c.writeBinaryRows(qr)
})

// If no field was sent, we expect an error.
if !fieldSent {
// This is just a failsafe. Should never happen.
if err == nil || err == io.EOF {
err = NewSQLErrorFromError(errors.New("unexpected: query ended without no results and no error"))
}
if werr := c.writeErrorPacketFromError(err); werr != nil {
// If we can't even write the error, we're done.
log.Errorf("Error writing query error to %s: %v", c, werr)
return werr
}
} else {
if err != nil {
// We can't send an error in the middle of a stream.
// All we can do is abort the send, which will cause a 2013.
log.Errorf("Error in the middle of a stream to %s: %v", c, err)
return err
}

// Send the end packet only sendFinished is false (results were streamed).
// In this case the affectedRows and lastInsertID are always 0 since it
// was a read operation.
if !sendFinished {
if err := c.writeEndResult(false, 0, 0, handler.WarningCount(c)); err != nil {
log.Errorf("Error writing result to %s: %v", c, err)
return err
}
}
}

timings.Record(queryTimingKey, queryStart)
case ComStmtSendLongData:
stmtID, paramID, chunkData, ok := c.parseComStmtSendLongData(data)
c.recycleReadPacket()
if !ok {
err := fmt.Errorf("error parsing statement send long data from client %v, returning error: %v", c.ConnectionID, data)
log.Error(err.Error())
return err
}

prepare, ok := c.PrepareData[stmtID]
if !ok {
err := fmt.Errorf("got wrong statement id from client %v, statement ID(%v) is not found from record", c.ConnectionID, stmtID)
log.Error(err.Error())
return err
}

if prepare.BindVars == nil ||
prepare.ParamsCount == uint16(0) ||
paramID >= prepare.ParamsCount {
err := fmt.Errorf("invalid parameter Number from client %v, statement: %v", c.ConnectionID, prepare.PrepareStmt)
log.Error(err.Error())
return err
}

chunk := make([]byte, len(chunkData))
copy(chunk, chunkData)

key := fmt.Sprintf("v%d", paramID+1)
if val, ok := prepare.BindVars[key]; ok {
val.Value = append(val.Value, chunk...)
} else {
prepare.BindVars[key] = sqltypes.BytesBindVariable(chunk)
}
case ComStmtClose:
stmtID, ok := c.parseComStmtClose(data)
c.recycleReadPacket()
if ok {
delete(c.PrepareData, stmtID)
}
case ComStmtReset:
stmtID, ok := c.parseComStmtReset(data)
c.recycleReadPacket()
if !ok {
log.Error("Got unhandled packet from client %v, returning error: %v", c.ConnectionID, data)
if err := c.writeErrorPacket(ERUnknownComError, SSUnknownComError, "error handling packet: %v", data); err != nil {
log.Error("Error writing error packet to client: %v", err)
return err
}
}

prepare, ok := c.PrepareData[stmtID]
if !ok {
log.Error("Commands were executed in an improper order from client %v, packet: %v", c.ConnectionID, data)
if err := c.writeErrorPacket(CRCommandsOutOfSync, SSUnknownComError, "commands were executed in an improper order: %v", data); err != nil {
log.Error("Error writing error packet to client: %v", err)
return err
}
}

if prepare.BindVars != nil {
for k := range prepare.BindVars {
prepare.BindVars[k] = nil
}
}

if err := c.writeOKPacket(0, 0, c.StatusFlags, 0); err != nil {
log.Error("Error writing ComStmtReset OK packet to client %v: %v", c.ConnectionID, err)
return err
}
default:
log.Errorf("Got unhandled packet (default) from %s, returning error: %v", c, data)
c.recycleReadPacket()
Expand Down Expand Up @@ -987,8 +1224,9 @@ func ParseErrorPacket(data []byte) error {
return NewSQLError(int(code), string(sqlState), "%v", msg)
}

func (conn *Conn) GetTLSClientCerts() []*x509.Certificate {
if tlsConn, ok := conn.conn.(*tls.Conn); ok {
// GetTLSClientCerts gets TLS certificates.
func (c *Conn) GetTLSClientCerts() []*x509.Certificate {
if tlsConn, ok := c.conn.(*tls.Conn); ok {
return tlsConn.ConnectionState().PeerCertificates
}
return nil
Expand Down
18 changes: 18 additions & 0 deletions go/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,24 @@ const (
// ComBinlogDump is COM_BINLOG_DUMP.
ComBinlogDump = 0x12

// ComPrepare is COM_PREPARE.
ComPrepare = 0x16

// ComStmtExecute is COM_STMT_EXECUTE.
ComStmtExecute = 0x17

// ComStmtSendLongData is COM_STMT_SEND_LONG_DATA
ComStmtSendLongData = 0x18

// ComStmtClose is COM_STMT_CLOSE.
ComStmtClose = 0x19

// ComStmtReset is COM_STMT_RESET
ComStmtReset = 0x1a

//ComStmtFetch is COM_STMT_FETCH
ComStmtFetch = 0x1c

// ComSetOption is COM_SET_OPTION
ComSetOption = 0x1b

Expand Down
12 changes: 12 additions & 0 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"

querypb "vitess.io/vitess/go/vt/proto/query"
)

const appendEntry = -1
Expand Down Expand Up @@ -432,6 +434,16 @@ func (db *DB) comQueryOrdered(query string) (*sqltypes.Result, error) {
return entry.QueryResult, nil
}

// ComPrepare is part of the mysql.Handler interface.
func (db *DB) ComPrepare(c *mysql.Conn, query string) ([]*querypb.Field, error) {
return nil, nil
}

// ComStmtExecute is part of the mysql.Handler interface.
func (db *DB) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData, callback func(*sqltypes.Result) error) error {
return nil
}

//
// Methods to add expected queries and results.
//
Expand Down
Loading