Skip to content

Commit

Permalink
Merge pull request #7082 from planetscale/online-ddl-request-context
Browse files Browse the repository at this point in the history
OnlineDDL: request_context/migration_context
  • Loading branch information
deepthi authored Dec 3, 2020
2 parents 2c3b0f0 + 397a442 commit 84f49b2
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 45 deletions.
2 changes: 1 addition & 1 deletion go/cmd/vtctld/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func initSchema() {
err = schemamanager.Run(
ctx,
controller,
schemamanager.NewTabletExecutor(wr, *schemaChangeReplicasTimeout),
schemamanager.NewTabletExecutor("vtctld/schema", wr, *schemaChangeReplicasTimeout),
)
if err != nil {
log.Errorf("Schema change failed, error: %v", err)
Expand Down
42 changes: 22 additions & 20 deletions go/vt/schema/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,18 @@ const (

// OnlineDDL encapsulates the relevant information in an online schema change request
type OnlineDDL struct {
Keyspace string `json:"keyspace,omitempty"`
Table string `json:"table,omitempty"`
Schema string `json:"schema,omitempty"`
SQL string `json:"sql,omitempty"`
UUID string `json:"uuid,omitempty"`
Strategy DDLStrategy `json:"strategy,omitempty"`
Options string `json:"options,omitempty"`
RequestTime int64 `json:"time_created,omitempty"`
Status OnlineDDLStatus `json:"status,omitempty"`
TabletAlias string `json:"tablet,omitempty"`
Retries int64 `json:"retries,omitempty"`
Keyspace string `json:"keyspace,omitempty"`
Table string `json:"table,omitempty"`
Schema string `json:"schema,omitempty"`
SQL string `json:"sql,omitempty"`
UUID string `json:"uuid,omitempty"`
Strategy DDLStrategy `json:"strategy,omitempty"`
Options string `json:"options,omitempty"`
RequestTime int64 `json:"time_created,omitempty"`
RequestContext string `json:"context,omitempty"`
Status OnlineDDLStatus `json:"status,omitempty"`
TabletAlias string `json:"tablet,omitempty"`
Retries int64 `json:"retries,omitempty"`
}

// ParseDDLStrategy validates the given ddl_strategy variable value , and parses the strategy and options parts.
Expand Down Expand Up @@ -139,20 +140,21 @@ func ReadTopo(ctx context.Context, conn topo.Conn, entryPath string) (*OnlineDDL
}

// NewOnlineDDL creates a schema change request with self generated UUID and RequestTime
func NewOnlineDDL(keyspace string, table string, sql string, strategy DDLStrategy, options string) (*OnlineDDL, error) {
func NewOnlineDDL(keyspace string, table string, sql string, strategy DDLStrategy, options string, requestContext string) (*OnlineDDL, error) {
u, err := CreateUUID()
if err != nil {
return nil, err
}
return &OnlineDDL{
Keyspace: keyspace,
Table: table,
SQL: sql,
UUID: u,
Strategy: strategy,
Options: options,
RequestTime: time.Now().UnixNano(),
Status: OnlineDDLStatusRequested,
Keyspace: keyspace,
Table: table,
SQL: sql,
UUID: u,
Strategy: strategy,
Options: options,
RequestTime: time.Now().UnixNano(),
RequestContext: requestContext,
Status: OnlineDDLStatusRequested,
}, nil
}

Expand Down
10 changes: 5 additions & 5 deletions go/vt/schemamanager/schemamanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestSchemaManagerExecutorOpenFail(t *testing.T) {
[]string{"create table test_table (pk int);"}, false, false, false)
controller.SetKeyspace("unknown_keyspace")
wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), newFakeTabletManagerClient())
executor := NewTabletExecutor(wr, testWaitReplicasTimeout)
executor := NewTabletExecutor("TestSchemaManagerExecutorOpenFail", wr, testWaitReplicasTimeout)
ctx := context.Background()

err := Run(ctx, controller, executor)
Expand All @@ -107,7 +107,7 @@ func TestSchemaManagerExecutorExecuteFail(t *testing.T) {
controller := newFakeController(
[]string{"create table test_table (pk int);"}, false, false, false)
wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), newFakeTabletManagerClient())
executor := NewTabletExecutor(wr, testWaitReplicasTimeout)
executor := NewTabletExecutor("TestSchemaManagerExecutorExecuteFail", wr, testWaitReplicasTimeout)
ctx := context.Background()

err := Run(ctx, controller, executor)
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestSchemaManagerRun(t *testing.T) {
fakeTmc.AddSchemaDefinition("vt_test_keyspace", &tabletmanagerdatapb.SchemaDefinition{})

wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), fakeTmc)
executor := NewTabletExecutor(wr, testWaitReplicasTimeout)
executor := NewTabletExecutor("TestSchemaManagerRun", wr, testWaitReplicasTimeout)

ctx := context.Background()
err := Run(ctx, controller, executor)
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestSchemaManagerExecutorFail(t *testing.T) {
fakeTmc.AddSchemaDefinition("vt_test_keyspace", &tabletmanagerdatapb.SchemaDefinition{})
fakeTmc.EnableExecuteFetchAsDbaError = true
wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), fakeTmc)
executor := NewTabletExecutor(wr, testWaitReplicasTimeout)
executor := NewTabletExecutor("TestSchemaManagerExecutorFail", wr, testWaitReplicasTimeout)

ctx := context.Background()
err := Run(ctx, controller, executor)
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestSchemaManagerRegisterControllerFactory(t *testing.T) {

func newFakeExecutor(t *testing.T) *TabletExecutor {
wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), newFakeTabletManagerClient())
return NewTabletExecutor(wr, testWaitReplicasTimeout)
return NewTabletExecutor("newFakeExecutor", wr, testWaitReplicasTimeout)
}

func newFakeTabletManagerClient() *fakeTabletManagerClient {
Expand Down
6 changes: 4 additions & 2 deletions go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

// TabletExecutor applies schema changes to all tablets.
type TabletExecutor struct {
requestContext string
wr *wrangler.Wrangler
tablets []*topodatapb.Tablet
isClosed bool
Expand All @@ -44,12 +45,13 @@ type TabletExecutor struct {
}

// NewTabletExecutor creates a new TabletExecutor instance
func NewTabletExecutor(wr *wrangler.Wrangler, waitReplicasTimeout time.Duration) *TabletExecutor {
func NewTabletExecutor(requestContext string, wr *wrangler.Wrangler, waitReplicasTimeout time.Duration) *TabletExecutor {
return &TabletExecutor{
wr: wr,
isClosed: true,
allowBigSchemaChange: false,
waitReplicasTimeout: waitReplicasTimeout,
requestContext: requestContext,
}
}

Expand Down Expand Up @@ -283,7 +285,7 @@ func (exec *TabletExecutor) executeOnlineDDL(
execResult.ExecutorErr = "Not an online DDL strategy"
return
}
onlineDDL, err := schema.NewOnlineDDL(exec.keyspace, tableName, sql, strategy, options)
onlineDDL, err := schema.NewOnlineDDL(exec.keyspace, tableName, sql, strategy, options, exec.requestContext)
if err != nil {
execResult.ExecutorErr = err.Error()
return
Expand Down
6 changes: 3 additions & 3 deletions go/vt/schemamanager/tablet_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestTabletExecutorOpenWithEmptyMasterAlias(t *testing.T) {
if err := wr.InitTablet(ctx, tablet, false /*allowMasterOverride*/, true /*createShardAndKeyspace*/, false /*allowUpdate*/); err != nil {
t.Fatalf("InitTablet failed: %v", err)
}
executor := NewTabletExecutor(wr, testWaitReplicasTimeout)
executor := NewTabletExecutor("TestTabletExecutorOpenWithEmptyMasterAlias", wr, testWaitReplicasTimeout)
if err := executor.Open(ctx, "test_keyspace"); err == nil || !strings.Contains(err.Error(), "does not have a master") {
t.Fatalf("executor.Open() = '%v', want error", err)
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestTabletExecutorValidate(t *testing.T) {
})

wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), fakeTmc)
executor := NewTabletExecutor(wr, testWaitReplicasTimeout)
executor := NewTabletExecutor("TestTabletExecutorValidate", wr, testWaitReplicasTimeout)
ctx := context.Background()

sqls := []string{
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestTabletExecutorDML(t *testing.T) {
})

wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), fakeTmc)
executor := NewTabletExecutor(wr, testWaitReplicasTimeout)
executor := NewTabletExecutor("TestTabletExecutorDML", wr, testWaitReplicasTimeout)
ctx := context.Background()

executor.Open(ctx, "unsharded_keyspace")
Expand Down
8 changes: 6 additions & 2 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2472,8 +2472,12 @@ func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
if err != nil {
return err
}

executor := schemamanager.NewTabletExecutor(wr, *waitReplicasTimeout)
executionUUID, err := schema.CreateUUID()
if err != nil {
return err
}
requestContext := fmt.Sprintf("vtctl:%s", executionUUID)
executor := schemamanager.NewTabletExecutor(requestContext, wr, *waitReplicasTimeout)
if *allowLongUnavailability {
executor.AllowBigSchemaChange()
}
Expand Down
10 changes: 7 additions & 3 deletions go/vt/vtctld/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/schemamanager"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
Expand Down Expand Up @@ -627,9 +628,12 @@ func initAPI(ctx context.Context, ts *topo.Server, actions *ActionRepository, re
})
wr := wrangler.New(logger, ts, tmClient)

executor := schemamanager.NewTabletExecutor(
wr, time.Duration(req.ReplicaTimeoutSeconds)*time.Second,
)
apiCallUUID, err := schema.CreateUUID()
if err != nil {
return err
}
requestContext := fmt.Sprintf("vtctld/api:%s", apiCallUUID)
executor := schemamanager.NewTabletExecutor(requestContext, wr, time.Duration(req.ReplicaTimeoutSeconds)*time.Second)

return schemamanager.Run(ctx,
schemamanager.NewUIController(req.SQL, req.Keyspace, w), executor)
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtctld/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ func reviewMigrationRequest(ctx context.Context, ts *topo.Server, tmClient tmcli
strategy,
options,
requested_timestamp,
migration_context,
migration_status
) VALUES (
%a, %a, %a, %a, %a, %a, %a, %a, FROM_UNIXTIME(%a), %a
%a, %a, %a, %a, %a, %a, %a, %a, FROM_UNIXTIME(%a), %a, %a
)`
parsed := sqlparser.BuildParsedQuery(sqlInsertSchemaMigration, "_vt",
":migration_uuid",
Expand All @@ -109,6 +110,7 @@ func reviewMigrationRequest(ctx context.Context, ts *topo.Server, tmClient tmcli
":strategy",
":options",
":requested_timestamp",
":migration_context",
":migration_status",
)
bindVars := map[string]*querypb.BindVariable{
Expand All @@ -121,6 +123,7 @@ func reviewMigrationRequest(ctx context.Context, ts *topo.Server, tmClient tmcli
"strategy": sqltypes.StringBindVariable(string(onlineDDL.Strategy)),
"options": sqltypes.StringBindVariable(onlineDDL.Options),
"requested_timestamp": sqltypes.Int64BindVariable(onlineDDL.RequestTimeSeconds()),
"migration_context": sqltypes.StringBindVariable(onlineDDL.RequestContext),
"migration_status": sqltypes.StringBindVariable(string(onlineDDL.Status)),
}

Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtgate/engine/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,24 @@ func (v *OnlineDDL) description() PrimitiveDescription {
}
}

//RouteType implements the Primitive interface
// RouteType implements the Primitive interface
func (v *OnlineDDL) RouteType() string {
return "OnlineDDL"
}

//GetKeyspaceName implements the Primitive interface
// GetKeyspaceName implements the Primitive interface
func (v *OnlineDDL) GetKeyspaceName() string {
return v.Keyspace.Name
}

//GetTableName implements the Primitive interface
// GetTableName implements the Primitive interface
func (v *OnlineDDL) GetTableName() string {
return v.DDL.GetTable().Name.String()
}

//Execute implements the Primitive interface
// Execute implements the Primitive interface
func (v *OnlineDDL) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool) (result *sqltypes.Result, err error) {
onlineDDL, err := schema.NewOnlineDDL(v.GetKeyspaceName(), v.GetTableName(), v.SQL, v.Strategy, v.Options)
onlineDDL, err := schema.NewOnlineDDL(v.GetKeyspaceName(), v.GetTableName(), v.SQL, v.Strategy, v.Options, "vtgate")
if err != nil {
return result, err
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ var vexecInsertTemplates = []string{
strategy,
options,
requested_timestamp,
migration_context,
migration_status
) VALUES (
'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', FROM_UNIXTIME(0), 'val'
'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', FROM_UNIXTIME(0), 'val', 'val'
)`,
}

Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
migration_uuid varchar(64) NOT NULL,
keyspace varchar(256) NOT NULL,
shard varchar(256) NOT NULL,
shard varchar(255) NOT NULL,
mysql_schema varchar(128) NOT NULL,
mysql_table varchar(128) NOT NULL,
migration_statement text NOT NULL,
Expand Down Expand Up @@ -56,6 +56,7 @@ const (
alterSchemaMigrationsTableTabletFailure = "ALTER TABLE %s.schema_migrations add column tablet_failure tinyint unsigned NOT NULL DEFAULT 0"
alterSchemaMigrationsTableTabletFailureIndex = "ALTER TABLE %s.schema_migrations add KEY tablet_failure_idx (tablet_failure, migration_status, retries)"
alterSchemaMigrationsTableProgress = "ALTER TABLE %s.schema_migrations add column progress float NOT NULL DEFAULT 0"
alterSchemaMigrationsTableContext = "ALTER TABLE %s.schema_migrations add column migration_context varchar(1024) NOT NULL DEFAULT ''"

sqlScheduleSingleMigration = `UPDATE %s.schema_migrations
SET
Expand Down Expand Up @@ -238,4 +239,5 @@ var applyDDL = []string{
fmt.Sprintf(alterSchemaMigrationsTableTabletFailure, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableTabletFailureIndex, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableProgress, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableContext, "_vt"),
}
3 changes: 2 additions & 1 deletion go/vt/wrangler/vexec_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,10 @@ func newSchemaMigrationsPlanner(vx *vexec) vexecPlanner {
strategy,
options,
requested_timestamp,
migration_context,
migration_status
) VALUES (
'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', FROM_UNIXTIME(0), 'val'
'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', FROM_UNIXTIME(0), 'val', 'val'
)`,
},
},
Expand Down

0 comments on commit 84f49b2

Please sign in to comment.