Skip to content

Commit

Permalink
Merge pull request #6404 from planetscale/tablet-savepoint
Browse files Browse the repository at this point in the history
Allow Savepoint on VTTablet
  • Loading branch information
sougou authored Jul 9, 2020
2 parents 9f2b0fd + a8cb436 commit 8f7d891
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 0 deletions.
19 changes: 19 additions & 0 deletions go/vt/vttablet/endtoend/framework/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,25 @@ func (client *QueryClient) BeginExecute(query string, bindvars map[string]*query
return qr, nil
}

// BeginExecuteBatch performs a BeginExecuteBatch.
func (client *QueryClient) BeginExecuteBatch(queries []*querypb.BoundQuery, asTransaction bool) ([]sqltypes.Result, error) {
if client.transactionID != 0 {
return nil, errors.New("already in transaction")
}
qr, transactionID, _, err := client.server.BeginExecuteBatch(
client.ctx,
&client.target,
queries,
asTransaction,
&querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL},
)
client.transactionID = transactionID
if err != nil {
return nil, err
}
return qr, nil
}

// ExecuteWithOptions executes a query using 'options'.
func (client *QueryClient) ExecuteWithOptions(query string, bindvars map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, error) {
return client.server.Execute(
Expand Down
157 changes: 157 additions & 0 deletions go/vt/vttablet/endtoend/savepoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
Copyright 2020 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package endtoend

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
)

func TestSavepointInTransactionWithSRollback(t *testing.T) {
client := framework.NewClient()
defer client.Execute("delete from vitess_test where intval = 5", nil)

queries := []*querypb.BoundQuery{
{
Sql: "savepoint a",
BindVariables: nil,
},
{
Sql: "insert into vitess_test (intval, floatval, charval, binval) values (5, null, null, null)",
BindVariables: nil,
},
}
_, err := client.BeginExecuteBatch(queries, false)
require.NoError(t, err)

qr, err := client.Execute("select intval from vitess_test where intval = 5", nil)
require.NoError(t, err)
require.Equal(t, "[[INT32(5)]]", fmt.Sprintf("%v", qr.Rows))

_, err = client.Execute("rollback to a", nil)
require.NoError(t, err)

err = client.Commit()
require.NoError(t, err)

qr, err = client.Execute("select intval from vitess_test where intval = 5", nil)
require.NoError(t, err)
require.Empty(t, qr.Rows)
}

func TestSavepointInTransactionWithRelease(t *testing.T) {
client := framework.NewClient()

vstart := framework.DebugVars()

queries := []*querypb.BoundQuery{
{
Sql: "savepoint a",
BindVariables: nil,
},
{
Sql: "insert into vitess_test (intval, floatval, charval, binval) values (5, null, null, null)",
BindVariables: nil,
},
}
_, err := client.BeginExecuteBatch(queries, false)
require.NoError(t, err)

qr, err := client.Execute("select intval from vitess_test where intval in (5, 6)", nil)
require.NoError(t, err)
require.Equal(t, "[[INT32(5)]]", fmt.Sprintf("%v", qr.Rows))

_, err = client.Execute("savepoint b", nil)
require.NoError(t, err)

_, err = client.Execute("insert into vitess_test (intval, floatval, charval, binval) values (6, null, null, null)", nil)
require.NoError(t, err)

qr, err = client.Execute("select intval from vitess_test where intval in (5, 6)", nil)
require.NoError(t, err)
require.Equal(t, "[[INT32(5)] [INT32(6)]]", fmt.Sprintf("%v", qr.Rows))

_, err = client.Execute("release savepoint b", nil)
require.NoError(t, err)

// After release savepoint does not exists
_, err = client.Execute("rollback to b", nil)
require.Error(t, err)

qr, err = client.Execute("select intval from vitess_test where intval in (5, 6)", nil)
require.NoError(t, err)
require.Equal(t, "[[INT32(5)] [INT32(6)]]", fmt.Sprintf("%v", qr.Rows))

_, err = client.Execute("rollback to a", nil)
require.NoError(t, err)

err = client.Commit()
require.NoError(t, err)

qr, err = client.Execute("select intval from vitess_test where intval in (5, 6)", nil)
require.NoError(t, err)
require.Empty(t, qr.Rows)

vend := framework.DebugVars()

expectedDiffs := []struct {
tag string
diff int
}{{
tag: "Queries/Histograms/Savepoint/Count",
diff: 2,
}, {
tag: "Queries/Histograms/Release/Count",
diff: 1,
}, {
tag: "Queries/Histograms/RollbackSavepoint/Count",
diff: 2,
}}
for _, expected := range expectedDiffs {
compareIntDiff(t, vend, expected.tag, vstart, expected.diff)
}
}

func TestSavepointWithoutTransaction(t *testing.T) {
client := framework.NewClient()
defer client.Execute("delete from vitess_test where intval = 6", nil)

_, err := client.Execute("savepoint a", nil)
require.NoError(t, err)

_, err = client.Execute("insert into vitess_test (intval, floatval, charval, binval) values (6, null, null, null)", nil)
require.NoError(t, err)

_, err = client.Execute("savepoint b", nil)
require.NoError(t, err)

qr, err := client.Execute("select intval from vitess_test where intval = 6", nil)
require.NoError(t, err)
require.Equal(t, "[[INT32(6)]]", fmt.Sprintf("%v", qr.Rows))

// Without transaction there is no savepoint.
_, err = client.Execute("release savepoint a", nil)
require.Error(t, err)

// Without transaction there is no savepoint.
_, err = client.Execute("rollback to a", nil)
require.Error(t, err)
}
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletserver/planbuilder/permission.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func BuildPermissions(stmt sqlparser.Statement) []Permission {
// no op
case *sqlparser.Begin, *sqlparser.Commit, *sqlparser.Rollback:
// no op
case *sqlparser.Savepoint, *sqlparser.Release, *sqlparser.SRollback:
// no op
default:
panic(fmt.Errorf("BUG: unexpected statement type: %T", node))
}
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vttablet/tabletserver/planbuilder/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ const (
PlanSelectStream
// PlanMessageStream is for "stream" statements.
PlanMessageStream
PlanSavepoint
PlanRelease
PlanSRollback
NumPlans
)

Expand All @@ -82,6 +85,9 @@ var planName = [NumPlans]string{
"OtherAdmin",
"SelectStream",
"MessageStream",
"Savepoint",
"Release",
"RollbackSavepoint",
}

func (pt PlanType) String() string {
Expand Down Expand Up @@ -180,6 +186,12 @@ func Build(statement sqlparser.Statement, tables map[string]*schema.Table) (*Pla
plan, err = &Plan{PlanID: PlanOtherRead}, nil
case *sqlparser.OtherAdmin:
plan, err = &Plan{PlanID: PlanOtherAdmin}, nil
case *sqlparser.Savepoint:
plan, err = &Plan{PlanID: PlanSavepoint}, nil
case *sqlparser.Release:
plan, err = &Plan{PlanID: PlanRelease}, nil
case *sqlparser.SRollback:
plan, err = &Plan{PlanID: PlanSRollback}, nil
default:
return nil, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "invalid SQL")
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s disallowed outside transaction", qre.plan.PlanID.String())
case planbuilder.PlanSet, planbuilder.PlanOtherRead, planbuilder.PlanOtherAdmin:
return qre.execOther()
case planbuilder.PlanSavepoint, planbuilder.PlanRelease, planbuilder.PlanSRollback:
return qre.execOther()
case planbuilder.PlanInsert, planbuilder.PlanUpdate, planbuilder.PlanDelete, planbuilder.PlanInsertMessage, planbuilder.PlanDDL:
return qre.execAutocommit(qre.txConnExec)
case planbuilder.PlanUpdateLimit, planbuilder.PlanDeleteLimit:
Expand Down Expand Up @@ -200,6 +202,8 @@ func (qre *QueryExecutor) txConnExec(conn *StatefulConnection) (*sqltypes.Result
return qre.execDMLLimit(conn)
case planbuilder.PlanSet, planbuilder.PlanOtherRead, planbuilder.PlanOtherAdmin:
return qre.execSQL(conn, qre.query, true)
case planbuilder.PlanSavepoint, planbuilder.PlanRelease, planbuilder.PlanSRollback:
return qre.execSQL(conn, qre.query, true)
case planbuilder.PlanSelect, planbuilder.PlanSelectLock, planbuilder.PlanSelectImpossible:
maxrows := qre.getSelectLimit()
qre.bindVars["#maxLimit"] = sqltypes.Int64BindVariable(maxrows + 1)
Expand Down
31 changes: 31 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestQueryExecutorPlans(t *testing.T) {
fields := sqltypes.MakeTestFields("a|b", "int64|varchar")
fieldResult := sqltypes.MakeTestResult(fields)
selectResult := sqltypes.MakeTestResult(fields, "1|aaa")
emptyResult := &sqltypes.Result{}

// The queries are run both in and outside a transaction.
testcases := []struct {
Expand Down Expand Up @@ -208,6 +209,36 @@ func TestQueryExecutorPlans(t *testing.T) {
resultWant: dmlResult,
planWant: "DDL",
logWant: "alter table test_table add zipcode int",
}, {
input: "savepoint a",
dbResponses: []dbResponse{{
query: "savepoint a",
result: emptyResult,
}},
resultWant: emptyResult,
planWant: "Savepoint",
logWant: "savepoint a",
inTxWant: "savepoint a",
}, {
input: "ROLLBACK work to SAVEPOINT a",
dbResponses: []dbResponse{{
query: "ROLLBACK work to SAVEPOINT a",
result: emptyResult,
}},
resultWant: emptyResult,
planWant: "RollbackSavepoint",
logWant: "ROLLBACK work to SAVEPOINT a",
inTxWant: "ROLLBACK work to SAVEPOINT a",
}, {
input: "RELEASE savepoint a",
dbResponses: []dbResponse{{
query: "RELEASE savepoint a",
result: emptyResult,
}},
resultWant: emptyResult,
planWant: "Release",
logWant: "RELEASE savepoint a",
inTxWant: "RELEASE savepoint a",
}}
for _, tcase := range testcases {
func() {
Expand Down

0 comments on commit 8f7d891

Please sign in to comment.