diff --git a/go/cmd/vtctld/schema.go b/go/cmd/vtctld/schema.go index df93cbc393c..55abd11d344 100644 --- a/go/cmd/vtctld/schema.go +++ b/go/cmd/vtctld/schema.go @@ -72,7 +72,7 @@ func initSchema() { err = schemamanager.Run( ctx, controller, - schemamanager.NewTabletExecutor(wr, *schemaChangeReplicasTimeout), + schemamanager.NewTabletExecutor("vtctld/schema", wr, *schemaChangeReplicasTimeout), ) if err != nil { log.Errorf("Schema change failed, error: %v", err) diff --git a/go/vt/schema/online_ddl.go b/go/vt/schema/online_ddl.go index e6a1945a724..0a836581562 100644 --- a/go/vt/schema/online_ddl.go +++ b/go/vt/schema/online_ddl.go @@ -89,17 +89,18 @@ const ( // OnlineDDL encapsulates the relevant information in an online schema change request type OnlineDDL struct { - Keyspace string `json:"keyspace,omitempty"` - Table string `json:"table,omitempty"` - Schema string `json:"schema,omitempty"` - SQL string `json:"sql,omitempty"` - UUID string `json:"uuid,omitempty"` - Strategy DDLStrategy `json:"strategy,omitempty"` - Options string `json:"options,omitempty"` - RequestTime int64 `json:"time_created,omitempty"` - Status OnlineDDLStatus `json:"status,omitempty"` - TabletAlias string `json:"tablet,omitempty"` - Retries int64 `json:"retries,omitempty"` + Keyspace string `json:"keyspace,omitempty"` + Table string `json:"table,omitempty"` + Schema string `json:"schema,omitempty"` + SQL string `json:"sql,omitempty"` + UUID string `json:"uuid,omitempty"` + Strategy DDLStrategy `json:"strategy,omitempty"` + Options string `json:"options,omitempty"` + RequestTime int64 `json:"time_created,omitempty"` + RequestContext string `json:"context,omitempty"` + Status OnlineDDLStatus `json:"status,omitempty"` + TabletAlias string `json:"tablet,omitempty"` + Retries int64 `json:"retries,omitempty"` } // ParseDDLStrategy validates the given ddl_strategy variable value , and parses the strategy and options parts. @@ -139,20 +140,21 @@ func ReadTopo(ctx context.Context, conn topo.Conn, entryPath string) (*OnlineDDL } // NewOnlineDDL creates a schema change request with self generated UUID and RequestTime -func NewOnlineDDL(keyspace string, table string, sql string, strategy DDLStrategy, options string) (*OnlineDDL, error) { +func NewOnlineDDL(keyspace string, table string, sql string, strategy DDLStrategy, options string, requestContext string) (*OnlineDDL, error) { u, err := CreateUUID() if err != nil { return nil, err } return &OnlineDDL{ - Keyspace: keyspace, - Table: table, - SQL: sql, - UUID: u, - Strategy: strategy, - Options: options, - RequestTime: time.Now().UnixNano(), - Status: OnlineDDLStatusRequested, + Keyspace: keyspace, + Table: table, + SQL: sql, + UUID: u, + Strategy: strategy, + Options: options, + RequestTime: time.Now().UnixNano(), + RequestContext: requestContext, + Status: OnlineDDLStatusRequested, }, nil } diff --git a/go/vt/schemamanager/schemamanager_test.go b/go/vt/schemamanager/schemamanager_test.go index fa83dd3598a..ffa38fa3535 100644 --- a/go/vt/schemamanager/schemamanager_test.go +++ b/go/vt/schemamanager/schemamanager_test.go @@ -94,7 +94,7 @@ func TestSchemaManagerExecutorOpenFail(t *testing.T) { []string{"create table test_table (pk int);"}, false, false, false) controller.SetKeyspace("unknown_keyspace") wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), newFakeTabletManagerClient()) - executor := NewTabletExecutor(wr, testWaitReplicasTimeout) + executor := NewTabletExecutor("TestSchemaManagerExecutorOpenFail", wr, testWaitReplicasTimeout) ctx := context.Background() err := Run(ctx, controller, executor) @@ -107,7 +107,7 @@ func TestSchemaManagerExecutorExecuteFail(t *testing.T) { controller := newFakeController( []string{"create table test_table (pk int);"}, false, false, false) wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), newFakeTabletManagerClient()) - executor := NewTabletExecutor(wr, testWaitReplicasTimeout) + executor := NewTabletExecutor("TestSchemaManagerExecutorExecuteFail", wr, testWaitReplicasTimeout) ctx := context.Background() err := Run(ctx, controller, executor) @@ -138,7 +138,7 @@ func TestSchemaManagerRun(t *testing.T) { fakeTmc.AddSchemaDefinition("vt_test_keyspace", &tabletmanagerdatapb.SchemaDefinition{}) wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), fakeTmc) - executor := NewTabletExecutor(wr, testWaitReplicasTimeout) + executor := NewTabletExecutor("TestSchemaManagerRun", wr, testWaitReplicasTimeout) ctx := context.Background() err := Run(ctx, controller, executor) @@ -184,7 +184,7 @@ func TestSchemaManagerExecutorFail(t *testing.T) { fakeTmc.AddSchemaDefinition("vt_test_keyspace", &tabletmanagerdatapb.SchemaDefinition{}) fakeTmc.EnableExecuteFetchAsDbaError = true wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), fakeTmc) - executor := NewTabletExecutor(wr, testWaitReplicasTimeout) + executor := NewTabletExecutor("TestSchemaManagerExecutorFail", wr, testWaitReplicasTimeout) ctx := context.Background() err := Run(ctx, controller, executor) @@ -229,7 +229,7 @@ func TestSchemaManagerRegisterControllerFactory(t *testing.T) { func newFakeExecutor(t *testing.T) *TabletExecutor { wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), newFakeTabletManagerClient()) - return NewTabletExecutor(wr, testWaitReplicasTimeout) + return NewTabletExecutor("newFakeExecutor", wr, testWaitReplicasTimeout) } func newFakeTabletManagerClient() *fakeTabletManagerClient { diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go index 6c7094c5c9f..5e3f54bbb89 100644 --- a/go/vt/schemamanager/tablet_executor.go +++ b/go/vt/schemamanager/tablet_executor.go @@ -34,6 +34,7 @@ import ( // TabletExecutor applies schema changes to all tablets. type TabletExecutor struct { + requestContext string wr *wrangler.Wrangler tablets []*topodatapb.Tablet isClosed bool @@ -44,12 +45,13 @@ type TabletExecutor struct { } // NewTabletExecutor creates a new TabletExecutor instance -func NewTabletExecutor(wr *wrangler.Wrangler, waitReplicasTimeout time.Duration) *TabletExecutor { +func NewTabletExecutor(requestContext string, wr *wrangler.Wrangler, waitReplicasTimeout time.Duration) *TabletExecutor { return &TabletExecutor{ wr: wr, isClosed: true, allowBigSchemaChange: false, waitReplicasTimeout: waitReplicasTimeout, + requestContext: requestContext, } } @@ -283,7 +285,7 @@ func (exec *TabletExecutor) executeOnlineDDL( execResult.ExecutorErr = "Not an online DDL strategy" return } - onlineDDL, err := schema.NewOnlineDDL(exec.keyspace, tableName, sql, strategy, options) + onlineDDL, err := schema.NewOnlineDDL(exec.keyspace, tableName, sql, strategy, options, exec.requestContext) if err != nil { execResult.ExecutorErr = err.Error() return diff --git a/go/vt/schemamanager/tablet_executor_test.go b/go/vt/schemamanager/tablet_executor_test.go index f8e5fb4b354..af0e2924634 100644 --- a/go/vt/schemamanager/tablet_executor_test.go +++ b/go/vt/schemamanager/tablet_executor_test.go @@ -72,7 +72,7 @@ func TestTabletExecutorOpenWithEmptyMasterAlias(t *testing.T) { if err := wr.InitTablet(ctx, tablet, false /*allowMasterOverride*/, true /*createShardAndKeyspace*/, false /*allowUpdate*/); err != nil { t.Fatalf("InitTablet failed: %v", err) } - executor := NewTabletExecutor(wr, testWaitReplicasTimeout) + executor := NewTabletExecutor("TestTabletExecutorOpenWithEmptyMasterAlias", wr, testWaitReplicasTimeout) if err := executor.Open(ctx, "test_keyspace"); err == nil || !strings.Contains(err.Error(), "does not have a master") { t.Fatalf("executor.Open() = '%v', want error", err) } @@ -106,7 +106,7 @@ func TestTabletExecutorValidate(t *testing.T) { }) wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), fakeTmc) - executor := NewTabletExecutor(wr, testWaitReplicasTimeout) + executor := NewTabletExecutor("TestTabletExecutorValidate", wr, testWaitReplicasTimeout) ctx := context.Background() sqls := []string{ @@ -196,7 +196,7 @@ func TestTabletExecutorDML(t *testing.T) { }) wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), fakeTmc) - executor := NewTabletExecutor(wr, testWaitReplicasTimeout) + executor := NewTabletExecutor("TestTabletExecutorDML", wr, testWaitReplicasTimeout) ctx := context.Background() executor.Open(ctx, "unsharded_keyspace") diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 33b66033b62..ef286c311a9 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2472,8 +2472,12 @@ func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl if err != nil { return err } - - executor := schemamanager.NewTabletExecutor(wr, *waitReplicasTimeout) + executionUUID, err := schema.CreateUUID() + if err != nil { + return err + } + requestContext := fmt.Sprintf("vtctl:%s", executionUUID) + executor := schemamanager.NewTabletExecutor(requestContext, wr, *waitReplicasTimeout) if *allowLongUnavailability { executor.AllowBigSchemaChange() } diff --git a/go/vt/vtctld/api.go b/go/vt/vtctld/api.go index 1b25263096f..01306867f68 100644 --- a/go/vt/vtctld/api.go +++ b/go/vt/vtctld/api.go @@ -32,6 +32,7 @@ import ( "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/schemamanager" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -627,9 +628,12 @@ func initAPI(ctx context.Context, ts *topo.Server, actions *ActionRepository, re }) wr := wrangler.New(logger, ts, tmClient) - executor := schemamanager.NewTabletExecutor( - wr, time.Duration(req.ReplicaTimeoutSeconds)*time.Second, - ) + apiCallUUID, err := schema.CreateUUID() + if err != nil { + return err + } + requestContext := fmt.Sprintf("vtctld/api:%s", apiCallUUID) + executor := schemamanager.NewTabletExecutor(requestContext, wr, time.Duration(req.ReplicaTimeoutSeconds)*time.Second) return schemamanager.Run(ctx, schemamanager.NewUIController(req.SQL, req.Keyspace, w), executor) diff --git a/go/vt/vtctld/schema.go b/go/vt/vtctld/schema.go index 299b21b651c..358ac1822dc 100644 --- a/go/vt/vtctld/schema.go +++ b/go/vt/vtctld/schema.go @@ -95,9 +95,10 @@ func reviewMigrationRequest(ctx context.Context, ts *topo.Server, tmClient tmcli strategy, options, requested_timestamp, + migration_context, migration_status ) VALUES ( - %a, %a, %a, %a, %a, %a, %a, %a, FROM_UNIXTIME(%a), %a + %a, %a, %a, %a, %a, %a, %a, %a, FROM_UNIXTIME(%a), %a, %a )` parsed := sqlparser.BuildParsedQuery(sqlInsertSchemaMigration, "_vt", ":migration_uuid", @@ -109,6 +110,7 @@ func reviewMigrationRequest(ctx context.Context, ts *topo.Server, tmClient tmcli ":strategy", ":options", ":requested_timestamp", + ":migration_context", ":migration_status", ) bindVars := map[string]*querypb.BindVariable{ @@ -121,6 +123,7 @@ func reviewMigrationRequest(ctx context.Context, ts *topo.Server, tmClient tmcli "strategy": sqltypes.StringBindVariable(string(onlineDDL.Strategy)), "options": sqltypes.StringBindVariable(onlineDDL.Options), "requested_timestamp": sqltypes.Int64BindVariable(onlineDDL.RequestTimeSeconds()), + "migration_context": sqltypes.StringBindVariable(onlineDDL.RequestContext), "migration_status": sqltypes.StringBindVariable(string(onlineDDL.Status)), } diff --git a/go/vt/vtgate/engine/online_ddl.go b/go/vt/vtgate/engine/online_ddl.go index 48a6c5d69f4..3440aaacbe3 100644 --- a/go/vt/vtgate/engine/online_ddl.go +++ b/go/vt/vtgate/engine/online_ddl.go @@ -52,24 +52,24 @@ func (v *OnlineDDL) description() PrimitiveDescription { } } -//RouteType implements the Primitive interface +// RouteType implements the Primitive interface func (v *OnlineDDL) RouteType() string { return "OnlineDDL" } -//GetKeyspaceName implements the Primitive interface +// GetKeyspaceName implements the Primitive interface func (v *OnlineDDL) GetKeyspaceName() string { return v.Keyspace.Name } -//GetTableName implements the Primitive interface +// GetTableName implements the Primitive interface func (v *OnlineDDL) GetTableName() string { return v.DDL.GetTable().Name.String() } -//Execute implements the Primitive interface +// Execute implements the Primitive interface func (v *OnlineDDL) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool) (result *sqltypes.Result, err error) { - onlineDDL, err := schema.NewOnlineDDL(v.GetKeyspaceName(), v.GetTableName(), v.SQL, v.Strategy, v.Options) + onlineDDL, err := schema.NewOnlineDDL(v.GetKeyspaceName(), v.GetTableName(), v.SQL, v.Strategy, v.Options, "vtgate") if err != nil { return result, err } diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 859797b7380..99d2691e049 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -80,9 +80,10 @@ var vexecInsertTemplates = []string{ strategy, options, requested_timestamp, + migration_context, migration_status ) VALUES ( - 'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', FROM_UNIXTIME(0), 'val' + 'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', FROM_UNIXTIME(0), 'val', 'val' )`, } diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 2d16af5c57b..8c323b429f0 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -28,7 +28,7 @@ const ( id bigint(20) unsigned NOT NULL AUTO_INCREMENT, migration_uuid varchar(64) NOT NULL, keyspace varchar(256) NOT NULL, - shard varchar(256) NOT NULL, + shard varchar(255) NOT NULL, mysql_schema varchar(128) NOT NULL, mysql_table varchar(128) NOT NULL, migration_statement text NOT NULL, @@ -56,6 +56,7 @@ const ( alterSchemaMigrationsTableTabletFailure = "ALTER TABLE %s.schema_migrations add column tablet_failure tinyint unsigned NOT NULL DEFAULT 0" alterSchemaMigrationsTableTabletFailureIndex = "ALTER TABLE %s.schema_migrations add KEY tablet_failure_idx (tablet_failure, migration_status, retries)" alterSchemaMigrationsTableProgress = "ALTER TABLE %s.schema_migrations add column progress float NOT NULL DEFAULT 0" + alterSchemaMigrationsTableContext = "ALTER TABLE %s.schema_migrations add column migration_context varchar(1024) NOT NULL DEFAULT ''" sqlScheduleSingleMigration = `UPDATE %s.schema_migrations SET @@ -238,4 +239,5 @@ var applyDDL = []string{ fmt.Sprintf(alterSchemaMigrationsTableTabletFailure, "_vt"), fmt.Sprintf(alterSchemaMigrationsTableTabletFailureIndex, "_vt"), fmt.Sprintf(alterSchemaMigrationsTableProgress, "_vt"), + fmt.Sprintf(alterSchemaMigrationsTableContext, "_vt"), } diff --git a/go/vt/wrangler/vexec_plan.go b/go/vt/wrangler/vexec_plan.go index 955bb315cb2..bb4c759ccbf 100644 --- a/go/vt/wrangler/vexec_plan.go +++ b/go/vt/wrangler/vexec_plan.go @@ -136,9 +136,10 @@ func newSchemaMigrationsPlanner(vx *vexec) vexecPlanner { strategy, options, requested_timestamp, + migration_context, migration_status ) VALUES ( - 'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', FROM_UNIXTIME(0), 'val' + 'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', FROM_UNIXTIME(0), 'val', 'val' )`, }, },