Skip to content

Commit

Permalink
proxy: add autocommit=0 to start txn radondb#298
Browse files Browse the repository at this point in the history
  • Loading branch information
BohuTANG authored and andyli029 committed Aug 28, 2019
1 parent eb10acc commit ec27550
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 2 deletions.
2 changes: 0 additions & 2 deletions src/proxy/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,14 +284,12 @@ func (spanner *Spanner) ComQuery(session *driver.Session, query string, bindVari
spanner.auditLog(session, R, xbase.RADON, query, qr)
return returnQuery(qr, callback, err)
case *sqlparser.Set:
log.Warning("proxy.query.set.query:%s", query)
if qr, err = spanner.handleSet(session, query, node); err != nil {
log.Error("proxy.set[%s].from.session[%v].error:%+v", query, session.ID(), err)
}
spanner.auditLog(session, R, xbase.SET, query, qr)
return returnQuery(qr, callback, err)
case *sqlparser.Checksum:
log.Warning("proxy.query.checksum.query:%s", query)
if qr, err = spanner.handleChecksumTable(session, query, node); err != nil {
log.Error("proxy.checksum[%s].from.session[%v].error:%+v", query, session.ID(), err)
}
Expand Down
29 changes: 29 additions & 0 deletions src/proxy/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ import (
)

const (
var_mysql_autocommit = "autocommit"
var_radon_streaming_fetch = "radon_streaming_fetch"
)

// handleSet used to handle the SET command.
func (spanner *Spanner) handleSet(session *driver.Session, query string, node *sqlparser.Set) (*sqltypes.Result, error) {
log := spanner.log
txSession := spanner.sessions.getTxnSession(session)

for _, expr := range node.Exprs {
name := expr.Name.Lowered()
if strings.HasPrefix(name, "@@session.") {
Expand Down Expand Up @@ -53,6 +56,32 @@ func (spanner *Spanner) handleSet(session *driver.Session, query string, node *s
txSession.setStreamingFetchVar(false)
}
}
case var_mysql_autocommit:
var autocommit = true

switch expr := expr.Expr.(type) {
case *sqlparser.SQLVal:
switch expr.Type {
case sqlparser.IntVal:
if expr.Val[0]=='0' {
autocommit = false
}
}
}
if !autocommit {
query := "begin"
node := &sqlparser.Transaction{
Action: "begin",
}
qr, err := spanner.handleMultiStmtTxn(session, query, node)
if err != nil {
log.Error("proxy.transaction[%s](by.autocommit).from.session[%v].error:%+v", query, session.ID(), err)
return nil, err
}
return qr, nil
}
default:
log.Warning("unhandle.set[%v]:%v", name, query)
}
}
qr := &sqltypes.Result{Warnings: 1}
Expand Down
43 changes: 43 additions & 0 deletions src/proxy/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,46 @@ func TestProxySet(t *testing.T) {
}
}
}

func TestProxySetAutocommit(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.DEBUG))
fakedbs, proxy, cleanup := MockProxy(log)
defer cleanup()
address := proxy.Address()

// fakedbs.
{
proxy.conf.Proxy.TwopcEnable = true
fakedbs.AddQueryPattern("create .*", &sqltypes.Result{})
fakedbs.AddQueryPattern("select .*", &sqltypes.Result{})
fakedbs.AddQueryPattern("xa .*", &sqltypes.Result{})
}

// set.
{
client, err := driver.NewConn("mock", "mock", address, "", "utf8")
assert.Nil(t, err)
{
query := "set autocommit=0"
_, err := client.FetchAll(query, -1)
assert.Nil(t, err)

query = "select 1"
_, err = client.FetchAll(query, -1)
assert.Nil(t, err)

query = "commit"
_, err = client.FetchAll(query, -1)
assert.Nil(t, err)
}
{
query := "set autocommit=1"
_, err := client.FetchAll(query, -1)
assert.Nil(t, err)

query = "commit"
_, err = client.FetchAll(query, -1)
assert.NotNil(t, err)
}
}
}

0 comments on commit ec27550

Please sign in to comment.