Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DDL: followups in multiple trajectories #6901

Merged
merged 16 commits into from
Nov 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/vt/schema/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type OnlineDDL struct {
Options string `json:"options,omitempty"`
RequestTime int64 `json:"time_created,omitempty"`
Status OnlineDDLStatus `json:"status,omitempty"`
TabletAlias string `json:"tablet,omitempty"`
Retries int64 `json:"retries,omitempty"`
}

// FromJSON creates an OnlineDDL from json
Expand Down
111 changes: 93 additions & 18 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
Expand Down Expand Up @@ -110,6 +111,7 @@ type Executor struct {
pool *connpool.Pool
tabletTypeFunc func() topodatapb.TabletType
ts *topo.Server
tabletAlias *topodatapb.TabletAlias

keyspace string
shard string
Expand Down Expand Up @@ -143,9 +145,10 @@ func PTOSCFileName() (fileName string, isOverride bool) {
}

// NewExecutor creates a new gh-ost executor.
func NewExecutor(env tabletenv.Env, ts *topo.Server, tabletTypeFunc func() topodatapb.TabletType) *Executor {
func NewExecutor(env tabletenv.Env, tabletAlias topodatapb.TabletAlias, ts *topo.Server, tabletTypeFunc func() topodatapb.TabletType) *Executor {
return &Executor{
env: env,
env: env,
tabletAlias: &tabletAlias,

pool: connpool.NewPool(env, "ExecutorPool", tabletenv.ConnPoolConfig{
Size: 1,
Expand All @@ -168,6 +171,11 @@ func (e *Executor) execQuery(ctx context.Context, query string) (result *sqltype
return conn.Exec(ctx, query, math.MaxInt32, true)
}

// TabletAliasString returns tablet alias as string (duh)
func (e *Executor) TabletAliasString() string {
return topoproto.TabletAliasString(e.tabletAlias)
}

func (e *Executor) initSchema(ctx context.Context) error {
e.initMutex.Lock()
defer e.initMutex.Unlock()
Expand Down Expand Up @@ -403,7 +411,7 @@ export ONLINE_DDL_PASSWORD
}
onHookContent := func(status schema.OnlineDDLStatus) string {
return fmt.Sprintf(`#!/bin/bash
curl -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dryrun='"$GH_OST_DRY_RUN"
curl -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dryrun='"$GH_OST_DRY_RUN"'&progress='"$GH_OST_PROGRESS"
`, *servenv.Port, onlineDDL.UUID, string(status))
}
if _, err := createTempScript(tempDir, "gh-ost-on-startup", onHookContent(schema.OnlineDDLStatusRunning)); err != nil {
Expand Down Expand Up @@ -486,7 +494,7 @@ curl -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dr
fmt.Sprintf("--serve-socket-file=%s", serveSocketFile),
fmt.Sprintf("--hooks-path=%s", tempDir),
fmt.Sprintf(`--hooks-hint-token=%s`, onlineDDL.UUID),
fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?p=low`, *servenv.Port),
fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?app=online-ddl:gh-ost:%s&p=low`, *servenv.Port, onlineDDL.UUID),
fmt.Sprintf(`--database=%s`, e.dbName),
fmt.Sprintf(`--table=%s`, onlineDDL.Table),
fmt.Sprintf(`--alter=%s`, alterOptions),
Expand Down Expand Up @@ -621,7 +629,7 @@ export MYSQL_PWD
my ($self, %args) = @_;

return sub {
if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?p=low")) {
if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?app=online-ddl:pt-osc:{{MIGRATION_UUID}}&p=low")) {
# Got HTTP 200 OK, means throttler is happy
return 0;
} else {
Expand Down Expand Up @@ -785,14 +793,16 @@ func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *s
return nil, ErrMigrationNotFound
}
onlineDDL = &schema.OnlineDDL{
Keyspace: row["keyspace"].ToString(),
Table: row["mysql_table"].ToString(),
Schema: row["mysql_schema"].ToString(),
SQL: row["migration_statement"].ToString(),
UUID: row["migration_uuid"].ToString(),
Strategy: sqlparser.DDLStrategy(row["strategy"].ToString()),
Options: row["options"].ToString(),
Status: schema.OnlineDDLStatus(row["migration_status"].ToString()),
Keyspace: row["keyspace"].ToString(),
Table: row["mysql_table"].ToString(),
Schema: row["mysql_schema"].ToString(),
SQL: row["migration_statement"].ToString(),
UUID: row["migration_uuid"].ToString(),
Strategy: sqlparser.DDLStrategy(row["strategy"].ToString()),
Options: row["options"].ToString(),
Status: schema.OnlineDDLStatus(row["migration_status"].ToString()),
Retries: row.AsInt64("retries", 0),
TabletAlias: row["tablet"].ToString(),
}
return onlineDDL, nil
}
Expand Down Expand Up @@ -1042,7 +1052,6 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
return countRunnning, err
}
for _, row := range r.Named().Rows {
// A pt-osc UUID is found which claims to be 'running'. Is it?
uuid := row["migration_uuid"].ToString()
// Since pt-osc doesn't have a "liveness" plugin entry point, we do it externally:
// if the process is alive, we update the `liveness_timestamp` for this migration.
Expand Down Expand Up @@ -1086,6 +1095,12 @@ func (e *Executor) reviewStaleMigrations(ctx context.Context) error {
return err
}
}
if onlineDDL.TabletAlias != e.TabletAliasString() {
// This means another tablet started the migration, and the migration has failed due to the tablet failure (e.g. master failover)
if err := e.updateTabletFailure(ctx, onlineDDL.UUID); err != nil {
return err
}
}
if err := e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed); err != nil {
return err
}
Expand All @@ -1094,6 +1109,13 @@ func (e *Executor) reviewStaleMigrations(ctx context.Context) error {
return nil
}

// retryTabletFailureMigrations looks for migrations failed by tablet failure (e.g. by failover)
// and retry them (put them back in the queue)
func (e *Executor) retryTabletFailureMigrations(ctx context.Context) error {
_, err := e.retryMigration(ctx, sqlWhereTabletFailure)
return err
}

// gcArtifacts garbage-collects migration artifacts from completed/failed migrations
func (e *Executor) gcArtifactTable(ctx context.Context, artifactTable string) error {
tableExists, err := e.tableExists(ctx, artifactTable)
Expand Down Expand Up @@ -1156,11 +1178,13 @@ func (e *Executor) onMigrationCheckTick() {
return
}
ctx := context.Background()

if err := e.initSchema(ctx); err != nil {
log.Error(err)
return
}
if err := e.retryTabletFailureMigrations(ctx); err != nil {
log.Error(err)
}
if err := e.scheduleNextMigration(ctx); err != nil {
log.Error(err)
}
Expand Down Expand Up @@ -1244,6 +1268,22 @@ func (e *Executor) updateArtifacts(ctx context.Context, uuid string, artifacts .
return err
}

// updateTabletFailure marks a given migration as "tablet_failed"
func (e *Executor) updateTabletFailure(ctx context.Context, uuid string) error {
parsed := sqlparser.BuildParsedQuery(sqlUpdateTabletFailure, "_vt",
":migration_uuid",
)
bindVars := map[string]*querypb.BindVariable{
"migration_uuid": sqltypes.StringBindVariable(uuid),
}
bound, err := parsed.GenerateQuery(bindVars, nil)
if err != nil {
return err
}
_, err = e.execQuery(ctx, bound)
return err
}

func (e *Executor) updateMigrationStatus(ctx context.Context, uuid string, status schema.OnlineDDLStatus) error {
parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationStatus, "_vt",
":migration_status",
Expand All @@ -1261,18 +1301,49 @@ func (e *Executor) updateMigrationStatus(ctx context.Context, uuid string, statu
return err
}

func (e *Executor) updateMigrationProgress(ctx context.Context, uuid string, progress float64) error {
if progress <= 0 {
// progress starts at 0, and can only increase.
// A value of "0" either means "This is the actual current progress" or "No information"
// In both cases there's nothing to update
return nil
}
parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationProgress, "_vt",
":migration_progress",
":migration_uuid",
)
bindVars := map[string]*querypb.BindVariable{
"migration_progress": sqltypes.Float64BindVariable(progress),
"migration_uuid": sqltypes.StringBindVariable(uuid),
}
bound, err := parsed.GenerateQuery(bindVars, nil)
if err != nil {
return err
}
_, err = e.execQuery(ctx, bound)
return err
}

func (e *Executor) retryMigration(ctx context.Context, whereExpr string) (result *sqltypes.Result, err error) {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()
parsed := sqlparser.BuildParsedQuery(sqlRetryMigration, "_vt", whereExpr)
result, err = e.execQuery(ctx, parsed.Query)
parsed := sqlparser.BuildParsedQuery(sqlRetryMigration, "_vt", ":tablet", whereExpr)
bindVars := map[string]*querypb.BindVariable{
"tablet": sqltypes.StringBindVariable(e.TabletAliasString()),
}
bound, err := parsed.GenerateQuery(bindVars, nil)
result, err = e.execQuery(ctx, bound)
return result, err
}

// OnSchemaMigrationStatus is called by TabletServer's API, which is invoked by a running gh-ost migration's hooks.
func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statusParam, dryrunParam string) (err error) {
func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statusParam, dryrunParam, progressParam string) (err error) {
status := schema.OnlineDDLStatus(statusParam)
dryRun := (dryrunParam == "true")
var progressPct float64
if pct, err := strconv.ParseFloat(progressParam, 32); err == nil {
progressPct = pct
}

if dryRun && status != schema.OnlineDDLStatusFailed {
// We don't consider dry-run reports unless there's a failure
Expand Down Expand Up @@ -1305,6 +1376,9 @@ func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statu
if err = e.updateMigrationStatus(ctx, uuidParam, status); err != nil {
return err
}
if err = e.updateMigrationProgress(ctx, uuidParam, progressPct); err != nil {
return err
}

return nil
}
Expand Down Expand Up @@ -1335,6 +1409,7 @@ func (e *Executor) VExec(ctx context.Context, vx *vexec.TabletVExec) (qr *queryp
// We can fill them in.
vx.ReplaceInsertColumnVal("shard", vx.ToStringVal(e.shard))
vx.ReplaceInsertColumnVal("mysql_schema", vx.ToStringVal(e.dbName))
vx.AddOrReplaceInsertColumnVal("tablet", vx.ToStringVal(e.TabletAliasString()))
return response(e.execQuery(ctx, vx.Query))
case *sqlparser.Update:
match, err := sqlparser.QueryMatchesTemplates(vx.Query, vexecUpdateTemplates)
Expand Down
47 changes: 42 additions & 5 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,17 @@ const (
artifacts varchar(1024) NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY uuid_idx (migration_uuid),
KEY keyspace_shard_idx (keyspace,shard),
KEY keyspace_shard_idx (keyspace(64),shard(64)),
KEY status_idx (migration_status, liveness_timestamp),
KEY cleanup_status_idx (cleanup_timestamp, migration_status)
) engine=InnoDB DEFAULT CHARSET=utf8mb4`
alterSchemaMigrationsTableRetries = "ALTER TABLE %s.schema_migrations add column retries int unsigned NOT NULL DEFAULT 0"
alterSchemaMigrationsTableTablet = "ALTER TABLE %s.schema_migrations add column tablet varchar(128) NOT NULL DEFAULT ''"
alterSchemaMigrationsTableArtifacts = "ALTER TABLE %s.schema_migrations modify artifacts TEXT NOT NULL"
alterSchemaMigrationsTableTabletFailure = "ALTER TABLE %s.schema_migrations add column tablet_failure tinyint unsigned NOT NULL DEFAULT 0"
alterSchemaMigrationsTableTabletFailureIndex = "ALTER TABLE %s.schema_migrations add KEY tablet_failure_idx (tablet_failure, migration_status, retries)"
alterSchemaMigrationsTableProgress = "ALTER TABLE %s.schema_migrations add column progress float NOT NULL DEFAULT 0"

sqlScheduleSingleMigration = `UPDATE %s.schema_migrations
SET
migration_status='ready',
Expand All @@ -65,6 +72,11 @@ const (
WHERE
migration_uuid=%a
`
sqlUpdateMigrationProgress = `UPDATE %s.schema_migrations
SET progress=%a
WHERE
migration_uuid=%a
`
sqlUpdateMigrationStartedTimestamp = `UPDATE %s.schema_migrations
SET started_timestamp=IFNULL(started_timestamp, NOW())
WHERE
Expand All @@ -81,22 +93,36 @@ const (
migration_uuid=%a
`
sqlUpdateArtifacts = `UPDATE %s.schema_migrations
SET artifacts=%a
SET artifacts=concat(%a, ',', artifacts)
WHERE
migration_uuid=%a
`
sqlUpdateTabletFailure = `UPDATE %s.schema_migrations
SET tablet_failure=1
WHERE
migration_uuid=%a
`
sqlRetryMigration = `UPDATE %s.schema_migrations
SET
migration_status='queued',
tablet=%a,
retries=retries + 1,
tablet_failure=0,
ready_timestamp=NULL,
started_timestamp=NULL,
liveness_timestamp=NULL,
completed_timestamp=NULL
completed_timestamp=NULL,
cleanup_timestamp=NULL
WHERE
migration_status IN ('failed', 'cancelled')
AND (%s)
LIMIT 1
`
sqlWhereTabletFailure = `
tablet_failure=1
AND migration_status='failed'
AND retries=0
`
sqlSelectRunningMigrations = `SELECT
migration_uuid
FROM %s.schema_migrations
Expand Down Expand Up @@ -141,7 +167,9 @@ const (
liveness_timestamp,
completed_timestamp,
migration_status,
log_path
log_path,
retries,
tablet
FROM %s.schema_migrations
WHERE
migration_uuid=%a
Expand All @@ -161,7 +189,10 @@ const (
started_timestamp,
liveness_timestamp,
completed_timestamp,
migration_status
migration_status,
log_path,
retries,
tablet
FROM %s.schema_migrations
WHERE
migration_status='ready'
Expand Down Expand Up @@ -201,4 +232,10 @@ var (
var applyDDL = []string{
fmt.Sprintf(sqlCreateSidecarDB, "_vt"),
fmt.Sprintf(sqlCreateSchemaMigrationsTable, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableRetries, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableTablet, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableArtifacts, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableTabletFailure, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableTabletFailureIndex, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableProgress, "_vt"),
}
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to
}
return tsv.sm.Target().TabletType
}
tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, topoServer, tabletTypeFunc)
tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tabletTypeFunc)
tsv.lagThrottler = throttle.NewThrottler(tsv, topoServer, tabletTypeFunc)
tsv.tableGC = gc.NewTableGC(tsv, topoServer, tabletTypeFunc, tsv.lagThrottler)

Expand Down Expand Up @@ -1528,7 +1528,8 @@ func (tsv *TabletServer) registerTwopczHandler() {
func (tsv *TabletServer) registerMigrationStatusHandler() {
tsv.exporter.HandleFunc("/schema-migration/report-status", func(w http.ResponseWriter, r *http.Request) {
ctx := tabletenv.LocalContext()
if err := tsv.onlineDDLExecutor.OnSchemaMigrationStatus(ctx, r.URL.Query().Get("uuid"), r.URL.Query().Get("status"), r.URL.Query().Get("dryrun")); err != nil {
query := r.URL.Query()
if err := tsv.onlineDDLExecutor.OnSchemaMigrationStatus(ctx, query.Get("uuid"), query.Get("status"), query.Get("dryrun"), query.Get("progress")); err != nil {
http.Error(w, fmt.Sprintf("not ok: %v", err), http.StatusInternalServerError)
return
}
Expand Down
Loading