Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OnlineDDL: request_context/migration_context #7082

Merged
merged 3 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/cmd/vtctld/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func initSchema() {
err = schemamanager.Run(
ctx,
controller,
schemamanager.NewTabletExecutor(wr, *schemaChangeReplicasTimeout),
schemamanager.NewTabletExecutor("vtctld/schema", wr, *schemaChangeReplicasTimeout),
)
if err != nil {
log.Errorf("Schema change failed, error: %v", err)
Expand Down
42 changes: 22 additions & 20 deletions go/vt/schema/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,18 @@ const (

// OnlineDDL encapsulates the relevant information in an online schema change request
type OnlineDDL struct {
Keyspace string `json:"keyspace,omitempty"`
Table string `json:"table,omitempty"`
Schema string `json:"schema,omitempty"`
SQL string `json:"sql,omitempty"`
UUID string `json:"uuid,omitempty"`
Strategy DDLStrategy `json:"strategy,omitempty"`
Options string `json:"options,omitempty"`
RequestTime int64 `json:"time_created,omitempty"`
Status OnlineDDLStatus `json:"status,omitempty"`
TabletAlias string `json:"tablet,omitempty"`
Retries int64 `json:"retries,omitempty"`
Keyspace string `json:"keyspace,omitempty"`
Table string `json:"table,omitempty"`
Schema string `json:"schema,omitempty"`
SQL string `json:"sql,omitempty"`
UUID string `json:"uuid,omitempty"`
Strategy DDLStrategy `json:"strategy,omitempty"`
Options string `json:"options,omitempty"`
RequestTime int64 `json:"time_created,omitempty"`
RequestContext string `json:"context,omitempty"`
Status OnlineDDLStatus `json:"status,omitempty"`
TabletAlias string `json:"tablet,omitempty"`
Retries int64 `json:"retries,omitempty"`
}

// ParseDDLStrategy validates the given ddl_strategy variable value , and parses the strategy and options parts.
Expand Down Expand Up @@ -139,20 +140,21 @@ func ReadTopo(ctx context.Context, conn topo.Conn, entryPath string) (*OnlineDDL
}

// NewOnlineDDL creates a schema change request with self generated UUID and RequestTime
func NewOnlineDDL(keyspace string, table string, sql string, strategy DDLStrategy, options string) (*OnlineDDL, error) {
func NewOnlineDDL(keyspace string, table string, sql string, strategy DDLStrategy, options string, requestContext string) (*OnlineDDL, error) {
u, err := CreateUUID()
if err != nil {
return nil, err
}
return &OnlineDDL{
Keyspace: keyspace,
Table: table,
SQL: sql,
UUID: u,
Strategy: strategy,
Options: options,
RequestTime: time.Now().UnixNano(),
Status: OnlineDDLStatusRequested,
Keyspace: keyspace,
Table: table,
SQL: sql,
UUID: u,
Strategy: strategy,
Options: options,
RequestTime: time.Now().UnixNano(),
RequestContext: requestContext,
Status: OnlineDDLStatusRequested,
}, nil
}

Expand Down
10 changes: 5 additions & 5 deletions go/vt/schemamanager/schemamanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestSchemaManagerExecutorOpenFail(t *testing.T) {
[]string{"create table test_table (pk int);"}, false, false, false)
controller.SetKeyspace("unknown_keyspace")
wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), newFakeTabletManagerClient())
executor := NewTabletExecutor(wr, testWaitReplicasTimeout)
executor := NewTabletExecutor("TestSchemaManagerExecutorOpenFail", wr, testWaitReplicasTimeout)
ctx := context.Background()

err := Run(ctx, controller, executor)
Expand All @@ -107,7 +107,7 @@ func TestSchemaManagerExecutorExecuteFail(t *testing.T) {
controller := newFakeController(
[]string{"create table test_table (pk int);"}, false, false, false)
wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), newFakeTabletManagerClient())
executor := NewTabletExecutor(wr, testWaitReplicasTimeout)
executor := NewTabletExecutor("TestSchemaManagerExecutorExecuteFail", wr, testWaitReplicasTimeout)
ctx := context.Background()

err := Run(ctx, controller, executor)
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestSchemaManagerRun(t *testing.T) {
fakeTmc.AddSchemaDefinition("vt_test_keyspace", &tabletmanagerdatapb.SchemaDefinition{})

wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), fakeTmc)
executor := NewTabletExecutor(wr, testWaitReplicasTimeout)
executor := NewTabletExecutor("TestSchemaManagerRun", wr, testWaitReplicasTimeout)

ctx := context.Background()
err := Run(ctx, controller, executor)
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestSchemaManagerExecutorFail(t *testing.T) {
fakeTmc.AddSchemaDefinition("vt_test_keyspace", &tabletmanagerdatapb.SchemaDefinition{})
fakeTmc.EnableExecuteFetchAsDbaError = true
wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), fakeTmc)
executor := NewTabletExecutor(wr, testWaitReplicasTimeout)
executor := NewTabletExecutor("TestSchemaManagerExecutorFail", wr, testWaitReplicasTimeout)

ctx := context.Background()
err := Run(ctx, controller, executor)
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestSchemaManagerRegisterControllerFactory(t *testing.T) {

func newFakeExecutor(t *testing.T) *TabletExecutor {
wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), newFakeTabletManagerClient())
return NewTabletExecutor(wr, testWaitReplicasTimeout)
return NewTabletExecutor("newFakeExecutor", wr, testWaitReplicasTimeout)
}

func newFakeTabletManagerClient() *fakeTabletManagerClient {
Expand Down
6 changes: 4 additions & 2 deletions go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

// TabletExecutor applies schema changes to all tablets.
type TabletExecutor struct {
requestContext string
wr *wrangler.Wrangler
tablets []*topodatapb.Tablet
isClosed bool
Expand All @@ -44,12 +45,13 @@ type TabletExecutor struct {
}

// NewTabletExecutor creates a new TabletExecutor instance
func NewTabletExecutor(wr *wrangler.Wrangler, waitReplicasTimeout time.Duration) *TabletExecutor {
func NewTabletExecutor(requestContext string, wr *wrangler.Wrangler, waitReplicasTimeout time.Duration) *TabletExecutor {
return &TabletExecutor{
wr: wr,
isClosed: true,
allowBigSchemaChange: false,
waitReplicasTimeout: waitReplicasTimeout,
requestContext: requestContext,
}
}

Expand Down Expand Up @@ -283,7 +285,7 @@ func (exec *TabletExecutor) executeOnlineDDL(
execResult.ExecutorErr = "Not an online DDL strategy"
return
}
onlineDDL, err := schema.NewOnlineDDL(exec.keyspace, tableName, sql, strategy, options)
onlineDDL, err := schema.NewOnlineDDL(exec.keyspace, tableName, sql, strategy, options, exec.requestContext)
if err != nil {
execResult.ExecutorErr = err.Error()
return
Expand Down
6 changes: 3 additions & 3 deletions go/vt/schemamanager/tablet_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestTabletExecutorOpenWithEmptyMasterAlias(t *testing.T) {
if err := wr.InitTablet(ctx, tablet, false /*allowMasterOverride*/, true /*createShardAndKeyspace*/, false /*allowUpdate*/); err != nil {
t.Fatalf("InitTablet failed: %v", err)
}
executor := NewTabletExecutor(wr, testWaitReplicasTimeout)
executor := NewTabletExecutor("TestTabletExecutorOpenWithEmptyMasterAlias", wr, testWaitReplicasTimeout)
if err := executor.Open(ctx, "test_keyspace"); err == nil || !strings.Contains(err.Error(), "does not have a master") {
t.Fatalf("executor.Open() = '%v', want error", err)
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestTabletExecutorValidate(t *testing.T) {
})

wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), fakeTmc)
executor := NewTabletExecutor(wr, testWaitReplicasTimeout)
executor := NewTabletExecutor("TestTabletExecutorValidate", wr, testWaitReplicasTimeout)
ctx := context.Background()

sqls := []string{
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestTabletExecutorDML(t *testing.T) {
})

wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), fakeTmc)
executor := NewTabletExecutor(wr, testWaitReplicasTimeout)
executor := NewTabletExecutor("TestTabletExecutorDML", wr, testWaitReplicasTimeout)
ctx := context.Background()

executor.Open(ctx, "unsharded_keyspace")
Expand Down
8 changes: 6 additions & 2 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2472,8 +2472,12 @@ func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
if err != nil {
return err
}

executor := schemamanager.NewTabletExecutor(wr, *waitReplicasTimeout)
executionUUID, err := schema.CreateUUID()
if err != nil {
return err
}
requestContext := fmt.Sprintf("vtctl:%s", executionUUID)
executor := schemamanager.NewTabletExecutor(requestContext, wr, *waitReplicasTimeout)
if *allowLongUnavailability {
executor.AllowBigSchemaChange()
}
Expand Down
10 changes: 7 additions & 3 deletions go/vt/vtctld/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/schemamanager"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
Expand Down Expand Up @@ -627,9 +628,12 @@ func initAPI(ctx context.Context, ts *topo.Server, actions *ActionRepository, re
})
wr := wrangler.New(logger, ts, tmClient)

executor := schemamanager.NewTabletExecutor(
wr, time.Duration(req.ReplicaTimeoutSeconds)*time.Second,
)
apiCallUUID, err := schema.CreateUUID()
if err != nil {
return err
}
requestContext := fmt.Sprintf("vtctld/api:%s", apiCallUUID)
executor := schemamanager.NewTabletExecutor(requestContext, wr, time.Duration(req.ReplicaTimeoutSeconds)*time.Second)

return schemamanager.Run(ctx,
schemamanager.NewUIController(req.SQL, req.Keyspace, w), executor)
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtctld/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ func reviewMigrationRequest(ctx context.Context, ts *topo.Server, tmClient tmcli
strategy,
options,
requested_timestamp,
migration_context,
migration_status
) VALUES (
%a, %a, %a, %a, %a, %a, %a, %a, FROM_UNIXTIME(%a), %a
%a, %a, %a, %a, %a, %a, %a, %a, FROM_UNIXTIME(%a), %a, %a
)`
parsed := sqlparser.BuildParsedQuery(sqlInsertSchemaMigration, "_vt",
":migration_uuid",
Expand All @@ -109,6 +110,7 @@ func reviewMigrationRequest(ctx context.Context, ts *topo.Server, tmClient tmcli
":strategy",
":options",
":requested_timestamp",
":migration_context",
":migration_status",
)
bindVars := map[string]*querypb.BindVariable{
Expand All @@ -121,6 +123,7 @@ func reviewMigrationRequest(ctx context.Context, ts *topo.Server, tmClient tmcli
"strategy": sqltypes.StringBindVariable(string(onlineDDL.Strategy)),
"options": sqltypes.StringBindVariable(onlineDDL.Options),
"requested_timestamp": sqltypes.Int64BindVariable(onlineDDL.RequestTimeSeconds()),
"migration_context": sqltypes.StringBindVariable(onlineDDL.RequestContext),
"migration_status": sqltypes.StringBindVariable(string(onlineDDL.Status)),
}

Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtgate/engine/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,24 @@ func (v *OnlineDDL) description() PrimitiveDescription {
}
}

//RouteType implements the Primitive interface
// RouteType implements the Primitive interface
func (v *OnlineDDL) RouteType() string {
return "OnlineDDL"
}

//GetKeyspaceName implements the Primitive interface
// GetKeyspaceName implements the Primitive interface
func (v *OnlineDDL) GetKeyspaceName() string {
return v.Keyspace.Name
}

//GetTableName implements the Primitive interface
// GetTableName implements the Primitive interface
func (v *OnlineDDL) GetTableName() string {
return v.DDL.GetTable().Name.String()
}

//Execute implements the Primitive interface
// Execute implements the Primitive interface
func (v *OnlineDDL) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool) (result *sqltypes.Result, err error) {
onlineDDL, err := schema.NewOnlineDDL(v.GetKeyspaceName(), v.GetTableName(), v.SQL, v.Strategy, v.Options)
onlineDDL, err := schema.NewOnlineDDL(v.GetKeyspaceName(), v.GetTableName(), v.SQL, v.Strategy, v.Options, "vtgate")
if err != nil {
return result, err
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ var vexecInsertTemplates = []string{
strategy,
options,
requested_timestamp,
migration_context,
migration_status
) VALUES (
'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', FROM_UNIXTIME(0), 'val'
'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', FROM_UNIXTIME(0), 'val', 'val'
)`,
}

Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
migration_uuid varchar(64) NOT NULL,
keyspace varchar(256) NOT NULL,
shard varchar(256) NOT NULL,
shard varchar(255) NOT NULL,
mysql_schema varchar(128) NOT NULL,
mysql_table varchar(128) NOT NULL,
migration_statement text NOT NULL,
Expand Down Expand Up @@ -56,6 +56,7 @@ const (
alterSchemaMigrationsTableTabletFailure = "ALTER TABLE %s.schema_migrations add column tablet_failure tinyint unsigned NOT NULL DEFAULT 0"
alterSchemaMigrationsTableTabletFailureIndex = "ALTER TABLE %s.schema_migrations add KEY tablet_failure_idx (tablet_failure, migration_status, retries)"
alterSchemaMigrationsTableProgress = "ALTER TABLE %s.schema_migrations add column progress float NOT NULL DEFAULT 0"
alterSchemaMigrationsTableContext = "ALTER TABLE %s.schema_migrations add column migration_context varchar(1024) NOT NULL DEFAULT ''"

sqlScheduleSingleMigration = `UPDATE %s.schema_migrations
SET
Expand Down Expand Up @@ -238,4 +239,5 @@ var applyDDL = []string{
fmt.Sprintf(alterSchemaMigrationsTableTabletFailure, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableTabletFailureIndex, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableProgress, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableContext, "_vt"),
}
3 changes: 2 additions & 1 deletion go/vt/wrangler/vexec_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,10 @@ func newSchemaMigrationsPlanner(vx *vexec) vexecPlanner {
strategy,
options,
requested_timestamp,
migration_context,
migration_status
) VALUES (
'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', FROM_UNIXTIME(0), 'val'
'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', FROM_UNIXTIME(0), 'val', 'val'
)`,
},
},
Expand Down