Skip to content

Commit

Permalink
pass gomysql result from backend TiDB to server (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
teckick authored Oct 20, 2020
1 parent 899fe1d commit 3addc39
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 26 deletions.
3 changes: 2 additions & 1 deletion pkg/proxy/driver/queryctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pingcap/parser/auth"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
gomysql "github.com/siddontang/go-mysql/mysql"
)

// Server information.
Expand Down Expand Up @@ -97,7 +98,7 @@ func (q *QueryCtxImpl) CurrentDB() string {
return q.currentDB
}

func (q *QueryCtxImpl) Execute(ctx context.Context, sql string) ([]server.ResultSet, error) {
func (q *QueryCtxImpl) Execute(ctx context.Context, sql string) (*gomysql.Result, error) {
return q.execute(ctx, sql)
}

Expand Down
25 changes: 10 additions & 15 deletions pkg/proxy/driver/queryctx_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"strings"

"github.com/pingcap-incubator/weir/pkg/proxy/server"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
Expand All @@ -14,7 +13,7 @@ import (
"go.uber.org/zap"
)

func (q *QueryCtxImpl) execute(ctx context.Context, sql string) ([]server.ResultSet, error) {
func (q *QueryCtxImpl) execute(ctx context.Context, sql string) (*gomysql.Result, error) {
charsetInfo, collation := q.sessionVars.GetCharsetInfo()
stmt, err := q.parser.ParseOneStmt(sql, charsetInfo, collation)
if err != nil {
Expand All @@ -33,7 +32,7 @@ func (q *QueryCtxImpl) isStmtDenied(ctx context.Context, sql string, stmtNode as
return false
}

func (q *QueryCtxImpl) executeStmt(ctx context.Context, sql string, stmtNode ast.StmtNode) ([]server.ResultSet, error) {
func (q *QueryCtxImpl) executeStmt(ctx context.Context, sql string, stmtNode ast.StmtNode) (*gomysql.Result, error) {
switch stmt := stmtNode.(type) {
case *ast.SetStmt:
return nil, q.setVariable(ctx, stmt)
Expand All @@ -52,15 +51,12 @@ func (q *QueryCtxImpl) executeStmt(ctx context.Context, sql string, stmtNode ast
}
}

func (q *QueryCtxImpl) executeShowStmt(ctx context.Context, sql string, stmt *ast.ShowStmt) ([]server.ResultSet, error) {
func (q *QueryCtxImpl) executeShowStmt(ctx context.Context, sql string, stmt *ast.ShowStmt) (*gomysql.Result, error) {
switch stmt.Tp {
case ast.ShowDatabases:
databases := q.ns.ListDatabases()
result, err := createShowDatabasesResult(databases)
if err != nil {
return nil, err
}
return []server.ResultSet{wrapMySQLResult(result)}, nil
return result, err
default:
return q.executeInBackend(ctx, sql, stmt)
}
Expand Down Expand Up @@ -103,15 +99,15 @@ func createShowDatabasesResult(dbNames []string) (*gomysql.Result, error) {
return result, nil
}

func (q *QueryCtxImpl) executeInBackend(ctx context.Context, sql string, stmtNode ast.StmtNode) ([]server.ResultSet, error) {
func (q *QueryCtxImpl) executeInBackend(ctx context.Context, sql string, stmtNode ast.StmtNode) (*gomysql.Result, error) {
if !q.isAutoCommit() || q.isInTransaction() {
return q.executeInTxnConn(ctx, sql, stmtNode)
} else {
return q.executeInNoTxnConn(ctx, sql, stmtNode)
}
}

func (q *QueryCtxImpl) executeInTxnConn(ctx context.Context, sql string, stmtNode ast.StmtNode) ([]server.ResultSet, error) {
func (q *QueryCtxImpl) executeInTxnConn(ctx context.Context, sql string, stmtNode ast.StmtNode) (*gomysql.Result, error) {
q.txnLock.Lock()
defer q.txnLock.Unlock()

Expand All @@ -124,12 +120,12 @@ func (q *QueryCtxImpl) executeInTxnConn(ctx context.Context, sql string, stmtNod
return nil, err
}

var ret []server.ResultSet
var ret *gomysql.Result
ret, err = q.executeInBackendConn(ctx, q.txnConn, q.currentDB, sql, stmtNode)
return ret, err
}

func (q *QueryCtxImpl) executeInNoTxnConn(ctx context.Context, sql string, stmtNode ast.StmtNode) ([]server.ResultSet, error) {
func (q *QueryCtxImpl) executeInNoTxnConn(ctx context.Context, sql string, stmtNode ast.StmtNode) (*gomysql.Result, error) {
conn, err := q.ns.GetPooledConn(ctx)
if err != nil {
return nil, err
Expand All @@ -139,7 +135,7 @@ func (q *QueryCtxImpl) executeInNoTxnConn(ctx context.Context, sql string, stmtN
return q.executeInBackendConn(ctx, conn, q.currentDB, sql, stmtNode)
}

func (q *QueryCtxImpl) executeInBackendConn(ctx context.Context, conn PooledBackendConn, db string, sql string, stmtNode ast.StmtNode) ([]server.ResultSet, error) {
func (q *QueryCtxImpl) executeInBackendConn(ctx context.Context, conn PooledBackendConn, db string, sql string, stmtNode ast.StmtNode) (*gomysql.Result, error) {
if err := conn.UseDB(db); err != nil {
return nil, err
}
Expand All @@ -155,8 +151,7 @@ func (q *QueryCtxImpl) executeInBackendConn(ctx context.Context, conn PooledBack
return nil, nil
}

resultSet := wrapMySQLResult(result)
return []server.ResultSet{resultSet}, nil
return result, nil
}

func (q *QueryCtxImpl) useDB(ctx context.Context, db string) error {
Expand Down
77 changes: 77 additions & 0 deletions pkg/proxy/server/conn_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
gomysql "github.com/siddontang/go-mysql/mysql"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -202,6 +203,82 @@ func (cc *clientConn) writeResultset(ctx context.Context, rs ResultSet, binary b
return cc.flush()
}

func (cc *clientConn) writeGoMySQLResultset(ctx context.Context, rs *gomysql.Resultset, binary bool, serverStatus uint16, fetchSize int) (runErr error) {
defer func() {
// close ResultSet when cursor doesn't exist
if !mysql.HasCursorExistsFlag(serverStatus) {
// TODO(eastfisher): close resultset
}
r := recover()
if r == nil {
return
}
if str, ok := r.(string); !ok || !strings.HasPrefix(str, memory.PanicMemoryExceed) {
panic(r)
}
// TODO(jianzhang.zj: add metrics here)
runErr = errors.Errorf("%v", r)
buf := make([]byte, 4096)
stackSize := runtime.Stack(buf, false)
buf = buf[:stackSize]
logutil.Logger(ctx).Error("write query result panic", zap.Stringer("lastSQL", getLastStmtInConn{cc}), zap.String("stack", string(buf)))
}()
var err error

if mysql.HasCursorExistsFlag(serverStatus) {
// TODO(eastfisher): writeChunksWithFetchSize
} else {
err = cc.doWriteGoMySQLResultset(ctx, rs, binary, serverStatus)
}
if err != nil {
return err
}

return cc.flush()
}

func (cc *clientConn) doWriteGoMySQLResultset(ctx context.Context, rs *gomysql.Resultset, binary bool, serverStatus uint16) error {
var err error
columns := convertFieldsToColumnInfos(rs.Fields)
err = cc.writeColumnInfo(columns, serverStatus)
if err != nil {
return err
}

data := cc.alloc.AllocWithLen(4, 1024)
for _, v := range rs.RowDatas {
data = data[0:4]
// TODO(eastfisher): implement binary resultset
data = append(data, v...)
if err = cc.writePacket(data); err != nil {
return err
}
}
return cc.writeEOF(serverStatus)
}

func convertFieldsToColumnInfos(fields []*gomysql.Field) []*ColumnInfo {
var rets []*ColumnInfo
for _, f := range fields {
ret := &ColumnInfo{
Schema: string(f.Schema),
Table: string(f.Table),
OrgTable: string(f.OrgTable),
Name: string(f.Name),
OrgName: string(f.OrgName),
ColumnLength: f.ColumnLength,
Charset: f.Charset,
Flag: f.Flag,
Decimal: f.Decimal,
Type: f.Type,
DefaultValueLength: f.DefaultValueLength,
DefaultValue: f.DefaultValue,
}
rets = append(rets, ret)
}
return rets
}

func (cc *clientConn) writeColumnInfo(columns []*ColumnInfo, serverStatus uint16) error {
data := cc.alloc.AllocWithLen(4, 1024)
data = dumpLengthEncodedInt(data, uint64(len(columns)))
Expand Down
12 changes: 3 additions & 9 deletions pkg/proxy/server/conn_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -152,17 +151,12 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
}
status := atomic.LoadInt32(&cc.status)
if rss != nil && (status == connStatusShutdown || status == connStatusWaitShutdown) {
for _, rs := range rss {
terror.Call(rs.Close)
}
// TODO(eastfisher): close ResultSet
return executor.ErrQueryInterrupted
}

if rss != nil {
if len(rss) == 1 {
err = cc.writeResultset(ctx, rss[0], false, 0, 0)
} else {
err = cc.writeMultiResultset(ctx, rss, false)
}
err = cc.writeGoMySQLResultset(ctx, rss.Resultset, false, 0 ,0)
} else {
// (eastfisher)currently we does not support load data
err = cc.writeOK()
Expand Down
3 changes: 2 additions & 1 deletion pkg/proxy/server/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/siddontang/go-mysql/mysql"
)

// IDriver opens IContext.
Expand Down Expand Up @@ -67,7 +68,7 @@ type QueryCtx interface {
CurrentDB() string

// Execute executes a SQL statement.
Execute(ctx context.Context, sql string) ([]ResultSet, error)
Execute(ctx context.Context, sql string) (*mysql.Result, error)

// ExecuteInternal executes a internal SQL statement.
ExecuteInternal(ctx context.Context, sql string) ([]ResultSet, error)
Expand Down

0 comments on commit 3addc39

Please sign in to comment.