From 8469f1fff71618f32f23e360b31c8fabb99efc14 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 10 Aug 2020 13:38:13 +0530 Subject: [PATCH] added autocommit in vcursor Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/fake_vcursor_test.go | 10 +- go/vt/vtgate/engine/primitive.go | 2 +- go/vt/vtgate/engine/set.go | 14 +-- go/vt/vtgate/executor.go | 5 + go/vt/vtgate/executor_set_test.go | 5 +- go/vt/vtgate/planbuilder/set.go | 92 ++++++------------- .../vtgate/planbuilder/testdata/set_cases.txt | 49 ++++++++++ go/vt/vtgate/vcursor_impl.go | 11 ++- 8 files changed, 104 insertions(+), 84 deletions(-) diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index 74866c3c73d..2c4ff607277 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -81,6 +81,10 @@ func (t noopVCursor) Session() SessionActions { return t } +func (t noopVCursor) SetAutocommit(autocommit bool) error { + panic("implement me") +} + func (t noopVCursor) SetTarget(target string) error { panic("implement me") } @@ -140,8 +144,6 @@ func (t noopVCursor) ResolveDestinations(keyspace string, ids []*querypb.Value, panic("unimplemented") } -func (t noopVCursor) SetAutocommit(bool) {} - var _ VCursor = (*loggingVCursor)(nil) var _ SessionActions = (*loggingVCursor)(nil) @@ -350,7 +352,9 @@ func (f *loggingVCursor) Rewind() { f.warnings = nil } -func (f *loggingVCursor) SetAutocommit(b bool) {} +func (f *loggingVCursor) SetAutocommit(b bool) error { + panic("implement me") +} func (f *loggingVCursor) nextResult() (*sqltypes.Result, error) { if f.results == nil || f.curResult >= len(f.results) { diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index e97a04ade0b..9d9a69b42b4 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -106,7 +106,7 @@ type ( // ShardSession returns shard info about open connections ShardSession() []*srvtopo.ResolvedShard - SetAutocommit(bool) + SetAutocommit(bool) error } // Plan represents the execution strategy for a given query. diff --git a/go/vt/vtgate/engine/set.go b/go/vt/vtgate/engine/set.go index fdd2612a8ca..6402ac8b5a5 100644 --- a/go/vt/vtgate/engine/set.go +++ b/go/vt/vtgate/engine/set.go @@ -342,6 +342,7 @@ func (svs *SysVarSet) checkAndUpdateSysVar(vcursor VCursor, res evalengine.Expre var _ SetOp = (*SysVarSetAware)(nil) +// System variables that needs special handling const ( AUTOCOMMIT = "autocommit" ) @@ -354,20 +355,11 @@ func (svss *SysVarSetAware) Execute(vcursor VCursor, env evalengine.ExpressionEn if err != nil { return err } - - autocommittable, err := value.ToBooleanStrict() + autocommit, err := value.ToBooleanStrict() if err != nil { return vterrors.Wrapf(err, "System setting '%s' can't be set to this value", svss.Name) } - - if autocommittable && vcursor.Session().InReservedConn() { - //TODO do it - //if err := conn.Commit(ctx, session); err != nil { - // return err - //} - } - vcursor.Session().SetAutocommit(autocommittable) - + vcursor.Session().SetAutocommit(autocommit) default: return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported construct") } diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 780414b0c43..a18615e1e0c 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -293,6 +293,11 @@ func (e *Executor) handleCommit(ctx context.Context, safeSession *SafeSession, l return &sqltypes.Result{}, err } +//Commit commits the existing transactions +func (e *Executor) Commit(ctx context.Context, safeSession *SafeSession) error { + return e.txConn.Commit(ctx, safeSession) +} + func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession, logStats *LogStats) (*sqltypes.Result, error) { execStart := time.Now() logStats.PlanTime = execStart.Sub(logStats.StartTime) diff --git a/go/vt/vtgate/executor_set_test.go b/go/vt/vtgate/executor_set_test.go index 250763eb72e..eff4c05592e 100644 --- a/go/vt/vtgate/executor_set_test.go +++ b/go/vt/vtgate/executor_set_test.go @@ -17,6 +17,7 @@ limitations under the License. package vtgate import ( + "fmt" "testing" querypb "vitess.io/vitess/go/vt/proto/query" @@ -237,8 +238,8 @@ func TestExecutorSet(t *testing.T) { in: "set session transaction read write", out: &vtgatepb.Session{Autocommit: true}, }} - for _, tcase := range testcases { - t.Run(tcase.in, func(t *testing.T) { + for i, tcase := range testcases { + t.Run(fmt.Sprintf("%d-%s", i, tcase.in), func(t *testing.T) { session := NewSafeSession(&vtgatepb.Session{Autocommit: true}) _, err := executorEnv.Execute(context.Background(), "TestExecute", session, tcase.in, nil) if tcase.err == "" { diff --git a/go/vt/vtgate/planbuilder/set.go b/go/vt/vtgate/planbuilder/set.go index 2b482946ecd..1df2ad0dfde 100644 --- a/go/vt/vtgate/planbuilder/set.go +++ b/go/vt/vtgate/planbuilder/set.go @@ -202,44 +202,8 @@ var checkAndIgnore = []setting{ {name: "version_tokens_session"}, } -var allowSetIfValueAlreadySet = []string{} - -var vitessAware = []string{ - engine.AUTOCOMMIT, -} - -var vitessShouldBeAwareOf = []string{ - "block_encryption_mode", - "character_set_client", - "character_set_connection", - "character_set_database", - "character_set_filesystem", - "character_set_server", - "collation_connection", - "collation_database", - "collation_server", - "completion_type", - "div_precision_increment", - "innodb_lock_wait_timeout", - "interactive_timeout", - "lc_time_names", - "lock_wait_timeout", - "max_allowed_packet", - "max_error_count", - "max_execution_time", - "max_join_size", - "max_length_for_sort_data", - "max_sort_length", - "max_user_connections", - "session_track_gtids", - "session_track_schema", - "session_track_state_change", - "session_track_system_variables", - "session_track_transaction_info", - "time_zone", - "transaction_isolation", - "version_tokens_session", - "sql_auto_is_null", +var vitessAware = []setting{ + {name: engine.AUTOCOMMIT, boolean: true}, } func init() { @@ -354,18 +318,14 @@ func (ec *expressionConverter) source(vschema ContextVSchema) (engine.Primitive, return primitive, nil } -func buildNotSupported(bool) func(*sqlparser.SetExpr, ContextVSchema) (engine.SetOp, error) { +func buildNotSupported(bool) planFunc { return func(expr *sqlparser.SetExpr, schema ContextVSchema, _ *expressionConverter) (engine.SetOp, error) { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "%s: system setting is not supported", expr.Name) } } -func buildSetOpIgnore(expr *sqlparser.SetExpr, _ ContextVSchema, _ *expressionConverter) (engine.SetOp, error) { - buf := sqlparser.NewTrackedBuffer(nil) - buf.Myprintf("%v", expr.Expr) - -func buildSetOpIgnore(boolean bool) func(*sqlparser.SetExpr, ContextVSchema) (engine.SetOp, error) { - return func(expr *sqlparser.SetExpr, _ ContextVSchema) (engine.SetOp, error) { +func buildSetOpIgnore(boolean bool) planFunc { + return func(expr *sqlparser.SetExpr, vschema ContextVSchema, _ *expressionConverter) (engine.SetOp, error) { return &engine.SysVarIgnore{ Name: expr.Name.Lowered(), Expr: extractValue(expr, boolean), @@ -373,7 +333,7 @@ func buildSetOpIgnore(boolean bool) func(*sqlparser.SetExpr, ContextVSchema) (en } } -func buildSetOpCheckAndIgnore(boolean bool) func(*sqlparser.SetExpr, ContextVSchema) (engine.SetOp, error) { +func buildSetOpCheckAndIgnore(boolean bool) planFunc { return func(expr *sqlparser.SetExpr, schema ContextVSchema, _ *expressionConverter) (engine.SetOp, error) { keyspace, dest, err := resolveDestination(schema) if err != nil { @@ -406,7 +366,7 @@ func expressionOkToDelegateToTablet(e sqlparser.Expr) bool { return valid } -func buildSetOpVarSet(boolean bool) func(*sqlparser.SetExpr, ContextVSchema) (engine.SetOp, error) { +func buildSetOpVarSet(boolean bool) planFunc { return func(expr *sqlparser.SetExpr, vschema ContextVSchema, _ *expressionConverter) (engine.SetOp, error) { ks, err := vschema.AnyKeyspace() if err != nil { @@ -422,27 +382,29 @@ func buildSetOpVarSet(boolean bool) func(*sqlparser.SetExpr, ContextVSchema) (en } } -func buildSetOpVitessAware(expr *sqlparser.SetExpr, vschema ContextVSchema, ec *expressionConverter) (engine.SetOp, error) { - ks, err := vschema.AnyKeyspace() - if err != nil { - return nil, err - } - - switch expr.Name.Lowered() { - case engine.AUTOCOMMIT: - convert, err := ec.convert(expr) +func buildSetOpVitessAware(boolean bool) planFunc { + return func(expr *sqlparser.SetExpr, vschema ContextVSchema, ec *expressionConverter) (engine.SetOp, error) { + ks, err := vschema.AnyKeyspace() if err != nil { return nil, err } - return &engine.SysVarSetAware{ - Name: expr.Name.Lowered(), - Keyspace: ks, - TargetDestination: vschema.Destination(), - Expr: convert, - OrigExpr: sqlparser.String(expr.Expr), - }, nil - default: - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unknown setting %s", expr.Name.String()) + + switch expr.Name.Lowered() { + case engine.AUTOCOMMIT: + convert, err := ec.convert(expr) + if err != nil { + return nil, err + } + return &engine.SysVarSetAware{ + Name: expr.Name.Lowered(), + Keyspace: ks, + TargetDestination: vschema.Destination(), + Expr: convert, + OrigExpr: extractValue(expr, boolean), + }, nil + default: + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unknown setting %s", expr.Name.String()) + } } } diff --git a/go/vt/vtgate/planbuilder/testdata/set_cases.txt b/go/vt/vtgate/planbuilder/testdata/set_cases.txt index 4b2372b5bcf..0cebd941b4b 100644 --- a/go/vt/vtgate/planbuilder/testdata/set_cases.txt +++ b/go/vt/vtgate/planbuilder/testdata/set_cases.txt @@ -168,3 +168,52 @@ ] } } + +# autocommit case +"SET autocommit = 1, autocommit = on, autocommit = 'on', autocommit = @myudv, autocommit = `on`" +{ + "QueryType": "SET", + "Original": "SET autocommit = 1, autocommit = on, autocommit = 'on', autocommit = @myudv", + "Instructions": { + "OperatorType": "Set", + "Ops": [ + { + "Name": "autocommit", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "OrigExpr": "1" + }, + { + "Name": "autocommit", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "OrigExpr": "'on'" + }, + { + "Name": "autocommit", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "OrigExpr": "'on'" + }, + { + "Name": "autocommit", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "OrigExpr": ":__vtudvmyudv" + } + ], + "Inputs": [ + { + "OperatorType": "SingleRow" + } + ] + } +} diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index dab6e0fa36f..e6a92fd259f 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -60,6 +60,7 @@ type iExecute interface { Execute(ctx context.Context, method string, session *SafeSession, s string, vars map[string]*querypb.BindVariable) (*sqltypes.Result, error) ExecuteMultiShard(ctx context.Context, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool) (qr *sqltypes.Result, errs []error) StreamExecuteMulti(ctx context.Context, s string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error) error + Commit(ctx context.Context, safeSession *SafeSession) error // TODO: remove when resolver is gone ParseDestinationTarget(targetString string) (string, topodatapb.TabletType, key.Destination, error) @@ -467,8 +468,14 @@ func (vc *vcursorImpl) TargetDestination(qualifier string) (key.Destination, *vi return vc.destination, keyspace.Keyspace, vc.tabletType, nil } -func (vc *vcursorImpl) SetAutocommit(b bool) { - vc.safeSession.Autocommit = b +func (vc *vcursorImpl) SetAutocommit(autocommit bool) error { + if autocommit && vc.safeSession.InTransaction() { + if err := vc.executor.Commit(vc.ctx, vc.safeSession); err != nil { + return err + } + } + vc.safeSession.Autocommit = autocommit + return nil } // ParseDestinationTarget parses destination target string and sets default keyspace if possible.