Skip to content

Commit

Permalink
added autocommit in vcursor
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Aug 10, 2020
1 parent ba5ac9c commit 8469f1f
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 84 deletions.
10 changes: 7 additions & 3 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (t noopVCursor) Session() SessionActions {
return t
}

func (t noopVCursor) SetAutocommit(autocommit bool) error {
panic("implement me")
}

func (t noopVCursor) SetTarget(target string) error {
panic("implement me")
}
Expand Down Expand Up @@ -140,8 +144,6 @@ func (t noopVCursor) ResolveDestinations(keyspace string, ids []*querypb.Value,
panic("unimplemented")
}

func (t noopVCursor) SetAutocommit(bool) {}

var _ VCursor = (*loggingVCursor)(nil)

var _ SessionActions = (*loggingVCursor)(nil)
Expand Down Expand Up @@ -350,7 +352,9 @@ func (f *loggingVCursor) Rewind() {
f.warnings = nil
}

func (f *loggingVCursor) SetAutocommit(b bool) {}
func (f *loggingVCursor) SetAutocommit(b bool) error {
panic("implement me")
}

func (f *loggingVCursor) nextResult() (*sqltypes.Result, error) {
if f.results == nil || f.curResult >= len(f.results) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type (
// ShardSession returns shard info about open connections
ShardSession() []*srvtopo.ResolvedShard

SetAutocommit(bool)
SetAutocommit(bool) error
}

// Plan represents the execution strategy for a given query.
Expand Down
14 changes: 3 additions & 11 deletions go/vt/vtgate/engine/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ func (svs *SysVarSet) checkAndUpdateSysVar(vcursor VCursor, res evalengine.Expre

var _ SetOp = (*SysVarSetAware)(nil)

// System variables that needs special handling
const (
AUTOCOMMIT = "autocommit"
)
Expand All @@ -354,20 +355,11 @@ func (svss *SysVarSetAware) Execute(vcursor VCursor, env evalengine.ExpressionEn
if err != nil {
return err
}

autocommittable, err := value.ToBooleanStrict()
autocommit, err := value.ToBooleanStrict()
if err != nil {
return vterrors.Wrapf(err, "System setting '%s' can't be set to this value", svss.Name)
}

if autocommittable && vcursor.Session().InReservedConn() {
//TODO do it
//if err := conn.Commit(ctx, session); err != nil {
// return err
//}
}
vcursor.Session().SetAutocommit(autocommittable)

vcursor.Session().SetAutocommit(autocommit)
default:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported construct")
}
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ func (e *Executor) handleCommit(ctx context.Context, safeSession *SafeSession, l
return &sqltypes.Result{}, err
}

//Commit commits the existing transactions
func (e *Executor) Commit(ctx context.Context, safeSession *SafeSession) error {
return e.txConn.Commit(ctx, safeSession)
}

func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession, logStats *LogStats) (*sqltypes.Result, error) {
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vtgate/executor_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package vtgate

import (
"fmt"
"testing"

querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -237,8 +238,8 @@ func TestExecutorSet(t *testing.T) {
in: "set session transaction read write",
out: &vtgatepb.Session{Autocommit: true},
}}
for _, tcase := range testcases {
t.Run(tcase.in, func(t *testing.T) {
for i, tcase := range testcases {
t.Run(fmt.Sprintf("%d-%s", i, tcase.in), func(t *testing.T) {
session := NewSafeSession(&vtgatepb.Session{Autocommit: true})
_, err := executorEnv.Execute(context.Background(), "TestExecute", session, tcase.in, nil)
if tcase.err == "" {
Expand Down
92 changes: 27 additions & 65 deletions go/vt/vtgate/planbuilder/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,44 +202,8 @@ var checkAndIgnore = []setting{
{name: "version_tokens_session"},
}

var allowSetIfValueAlreadySet = []string{}

var vitessAware = []string{
engine.AUTOCOMMIT,
}

var vitessShouldBeAwareOf = []string{
"block_encryption_mode",
"character_set_client",
"character_set_connection",
"character_set_database",
"character_set_filesystem",
"character_set_server",
"collation_connection",
"collation_database",
"collation_server",
"completion_type",
"div_precision_increment",
"innodb_lock_wait_timeout",
"interactive_timeout",
"lc_time_names",
"lock_wait_timeout",
"max_allowed_packet",
"max_error_count",
"max_execution_time",
"max_join_size",
"max_length_for_sort_data",
"max_sort_length",
"max_user_connections",
"session_track_gtids",
"session_track_schema",
"session_track_state_change",
"session_track_system_variables",
"session_track_transaction_info",
"time_zone",
"transaction_isolation",
"version_tokens_session",
"sql_auto_is_null",
var vitessAware = []setting{
{name: engine.AUTOCOMMIT, boolean: true},
}

func init() {
Expand Down Expand Up @@ -354,26 +318,22 @@ func (ec *expressionConverter) source(vschema ContextVSchema) (engine.Primitive,
return primitive, nil
}

func buildNotSupported(bool) func(*sqlparser.SetExpr, ContextVSchema) (engine.SetOp, error) {
func buildNotSupported(bool) planFunc {
return func(expr *sqlparser.SetExpr, schema ContextVSchema, _ *expressionConverter) (engine.SetOp, error) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "%s: system setting is not supported", expr.Name)
}
}

func buildSetOpIgnore(expr *sqlparser.SetExpr, _ ContextVSchema, _ *expressionConverter) (engine.SetOp, error) {
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("%v", expr.Expr)

func buildSetOpIgnore(boolean bool) func(*sqlparser.SetExpr, ContextVSchema) (engine.SetOp, error) {
return func(expr *sqlparser.SetExpr, _ ContextVSchema) (engine.SetOp, error) {
func buildSetOpIgnore(boolean bool) planFunc {
return func(expr *sqlparser.SetExpr, vschema ContextVSchema, _ *expressionConverter) (engine.SetOp, error) {
return &engine.SysVarIgnore{
Name: expr.Name.Lowered(),
Expr: extractValue(expr, boolean),
}, nil
}
}

func buildSetOpCheckAndIgnore(boolean bool) func(*sqlparser.SetExpr, ContextVSchema) (engine.SetOp, error) {
func buildSetOpCheckAndIgnore(boolean bool) planFunc {
return func(expr *sqlparser.SetExpr, schema ContextVSchema, _ *expressionConverter) (engine.SetOp, error) {
keyspace, dest, err := resolveDestination(schema)
if err != nil {
Expand Down Expand Up @@ -406,7 +366,7 @@ func expressionOkToDelegateToTablet(e sqlparser.Expr) bool {
return valid
}

func buildSetOpVarSet(boolean bool) func(*sqlparser.SetExpr, ContextVSchema) (engine.SetOp, error) {
func buildSetOpVarSet(boolean bool) planFunc {
return func(expr *sqlparser.SetExpr, vschema ContextVSchema, _ *expressionConverter) (engine.SetOp, error) {
ks, err := vschema.AnyKeyspace()
if err != nil {
Expand All @@ -422,27 +382,29 @@ func buildSetOpVarSet(boolean bool) func(*sqlparser.SetExpr, ContextVSchema) (en
}
}

func buildSetOpVitessAware(expr *sqlparser.SetExpr, vschema ContextVSchema, ec *expressionConverter) (engine.SetOp, error) {
ks, err := vschema.AnyKeyspace()
if err != nil {
return nil, err
}

switch expr.Name.Lowered() {
case engine.AUTOCOMMIT:
convert, err := ec.convert(expr)
func buildSetOpVitessAware(boolean bool) planFunc {
return func(expr *sqlparser.SetExpr, vschema ContextVSchema, ec *expressionConverter) (engine.SetOp, error) {
ks, err := vschema.AnyKeyspace()
if err != nil {
return nil, err
}
return &engine.SysVarSetAware{
Name: expr.Name.Lowered(),
Keyspace: ks,
TargetDestination: vschema.Destination(),
Expr: convert,
OrigExpr: sqlparser.String(expr.Expr),
}, nil
default:
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unknown setting %s", expr.Name.String())

switch expr.Name.Lowered() {
case engine.AUTOCOMMIT:
convert, err := ec.convert(expr)
if err != nil {
return nil, err
}
return &engine.SysVarSetAware{
Name: expr.Name.Lowered(),
Keyspace: ks,
TargetDestination: vschema.Destination(),
Expr: convert,
OrigExpr: extractValue(expr, boolean),
}, nil
default:
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unknown setting %s", expr.Name.String())
}
}
}

Expand Down
49 changes: 49 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/set_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,52 @@
]
}
}

# autocommit case
"SET autocommit = 1, autocommit = on, autocommit = 'on', autocommit = @myudv, autocommit = `on`"
{
"QueryType": "SET",
"Original": "SET autocommit = 1, autocommit = on, autocommit = 'on', autocommit = @myudv",
"Instructions": {
"OperatorType": "Set",
"Ops": [
{
"Name": "autocommit",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"OrigExpr": "1"
},
{
"Name": "autocommit",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"OrigExpr": "'on'"
},
{
"Name": "autocommit",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"OrigExpr": "'on'"
},
{
"Name": "autocommit",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"OrigExpr": ":__vtudvmyudv"
}
],
"Inputs": [
{
"OperatorType": "SingleRow"
}
]
}
}
11 changes: 9 additions & 2 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type iExecute interface {
Execute(ctx context.Context, method string, session *SafeSession, s string, vars map[string]*querypb.BindVariable) (*sqltypes.Result, error)
ExecuteMultiShard(ctx context.Context, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool) (qr *sqltypes.Result, errs []error)
StreamExecuteMulti(ctx context.Context, s string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error) error
Commit(ctx context.Context, safeSession *SafeSession) error

// TODO: remove when resolver is gone
ParseDestinationTarget(targetString string) (string, topodatapb.TabletType, key.Destination, error)
Expand Down Expand Up @@ -467,8 +468,14 @@ func (vc *vcursorImpl) TargetDestination(qualifier string) (key.Destination, *vi
return vc.destination, keyspace.Keyspace, vc.tabletType, nil
}

func (vc *vcursorImpl) SetAutocommit(b bool) {
vc.safeSession.Autocommit = b
func (vc *vcursorImpl) SetAutocommit(autocommit bool) error {
if autocommit && vc.safeSession.InTransaction() {
if err := vc.executor.Commit(vc.ctx, vc.safeSession); err != nil {
return err
}
}
vc.safeSession.Autocommit = autocommit
return nil
}

// ParseDestinationTarget parses destination target string and sets default keyspace if possible.
Expand Down

0 comments on commit 8469f1f

Please sign in to comment.