Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Prevent Orphaned VDiff2 Jobs #11768

Merged
merged 12 commits into from
Dec 4, 2022
16 changes: 14 additions & 2 deletions go/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ const (

// Error codes for client-side errors.
// Originally found in include/mysql/errmsg.h and
// https://dev.mysql.com/doc/refman/5.7/en/error-messages-client.html
// https://dev.mysql.com/doc/mysql-errors/en/client-error-reference.html
const (
// CRUnknownError is CR_UNKNOWN_ERROR
CRUnknownError = 2000
Expand All @@ -286,6 +286,10 @@ const (
// This is returned if a connection via a TCP socket fails.
CRConnHostError = 2003

// CRUnknownHost is CR_UNKNOWN_HOST
// This is returned if the host name cannot be resolved.
CRUnknownHost = 2005

// CRServerGone is CR_SERVER_GONE_ERROR.
// This is returned if the client tries to send a command but it fails.
CRServerGone = 2006
Expand Down Expand Up @@ -325,7 +329,7 @@ const (

// Error codes for server-side errors.
// Originally found in include/mysql/mysqld_error.h and
// https://dev.mysql.com/doc/refman/5.7/en/error-messages-server.html
// https://dev.mysql.com/doc/mysql-errors/en/server-error-reference.html
// The below are in sorted order by value, grouped by vterror code they should be bucketed into.
// See above reference for more information on each code.
const (
Expand Down Expand Up @@ -543,6 +547,9 @@ const (
ERJSONDocumentTooDeep = 3157
ERWrongValue = 1525

// max execution time exceeded
ERQueryTimeout = 3024

ErrCantCreateGeometryObject = 1416
ErrGISDataWrongEndianess = 3055
ErrNotImplementedForCartesianSRS = 3704
Expand Down Expand Up @@ -677,8 +684,12 @@ func IsEphemeralError(err error) bool {
CRConnHostError,
CRMalformedPacket,
CRNamedPipeStateError,
CRServerHandshakeErr,
CRServerGone,
CRServerLost,
CRSSLConnectionError,
CRUnknownError,
CRUnknownHost,
ERCantCreateThread,
ERDiskFull,
ERForcingClose,
Expand All @@ -689,6 +700,7 @@ func IsEphemeralError(err error) bool {
ERInternalError,
ERLockDeadlock,
ERLockWaitTimeout,
ERQueryTimeout,
EROutOfMemory,
EROutOfResources,
EROutOfSortMemory,
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/vdiff2.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid st
// on every shard.
if shardStateCounts[vdiff.StoppedState] > 0 {
summary.State = vdiff.StoppedState
} else if tableStateCounts[vdiff.ErrorState] > 0 {
} else if shardStateCounts[vdiff.ErrorState] > 0 || tableStateCounts[vdiff.ErrorState] > 0 {
summary.State = vdiff.ErrorState
} else if tableStateCounts[vdiff.StartedState] > 0 {
summary.State = vdiff.StartedState
Expand Down
67 changes: 60 additions & 7 deletions go/vt/vttablet/tabletmanager/vdiff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"strings"
"time"

"vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -127,18 +128,20 @@ func (ct *controller) run(ctx context.Context) {
row := qr.Named().Row()
state := VDiffState(strings.ToLower(row["state"].ToString()))
switch state {
case PendingState:
log.Infof("Starting vdiff %s", ct.uuid)
case PendingState, StartedState:
action := "Starting"
if state == StartedState {
action = "Restarting"
}
log.Infof("%s vdiff %s", action, ct.uuid)
if err := ct.start(ctx, dbClient); err != nil {
log.Errorf("Encountered an error for vdiff %s: %s", ct.uuid, err)
insertVDiffLog(ctx, dbClient, ct.id, fmt.Sprintf("Error: %s", err))
if err = ct.updateState(dbClient, ErrorState, err); err != nil {
log.Errorf("Encountered an error marking vdiff %s as errored: %v", ct.uuid, err)
if err := ct.saveErrorState(ctx, err); err != nil {
log.Errorf("Unable to save error state for vdiff %s; giving up because %s", ct.uuid, err.Error())
}
return
}
default:
log.Infof("VDiff %s was not marked as pending, doing nothing", state)
log.Infof("VDiff %s was not marked as runnable (state: %s), doing nothing", ct.uuid, state)
}
}

Expand Down Expand Up @@ -271,3 +274,53 @@ func (ct *controller) validate() error {
// TODO: check if vreplication workflow has errors, what else?
return nil
}

// saveErrorState saves the error state for the vdiff in the database.
// It never gives up trying to save the error state, unless the context
// has been cancelled or the done channel has been closed -- indicating
// that the engine is closing or the vdiff has been explicitly stopped.
// Note that when the engine is later opened the started vdiff will be
// restarted even though we were unable to save the error state.
// It uses exponential backoff with a factor of 1.5 to avoid creating
// too many database connections.
func (ct *controller) saveErrorState(ctx context.Context, saveErr error) error {
retryDelay := 100 * time.Millisecond
maxRetryDelay := 60 * time.Second
save := func() error {
dbClient := ct.vde.dbClientFactoryFiltered()
if err := dbClient.Connect(); err != nil {
return err
}
defer dbClient.Close()

if err := ct.updateState(dbClient, ErrorState, saveErr); err != nil {
return err
}
insertVDiffLog(ctx, dbClient, ct.id, fmt.Sprintf("Error: %s", saveErr))

return nil
}

for {
if err := save(); err != nil {
log.Warningf("Failed to persist vdiff error state: %v. Will retry in %s", err, retryDelay.String())
select {
case <-ctx.Done():
return fmt.Errorf("engine is shutting down")
case <-ct.done:
return fmt.Errorf("vdiff was stopped")
case <-time.After(retryDelay):
if retryDelay < maxRetryDelay {
retryDelay = time.Duration(float64(retryDelay) * 1.5)
if retryDelay > maxRetryDelay {
retryDelay = maxRetryDelay
}
}
continue
}
}

// Success
return nil
}
}
35 changes: 20 additions & 15 deletions go/vt/vttablet/tabletmanager/vdiff/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,16 @@ func (vde *Engine) Open(ctx context.Context, vre *vreplication.Engine) {
}

func (vde *Engine) openLocked(ctx context.Context) error {
// Start any pending VDiffs
rows, err := vde.getPendingVDiffs(ctx)
// This should never happen
if len(vde.controllers) > 0 {
log.Warningf("VDiff Engine invalid state detected: %d controllers existed when opening; resetting state", len(vde.controllers))
vde.resetControllers()
}

// At this point the tablet has no controllers running. So
// we want to start any VDiffs that have not been explicitly
// stopped or otherwise finished.
rows, err := vde.getVDiffsToRun(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -219,10 +227,7 @@ func (vde *Engine) Close() {
vde.cancel()

// We still have to wait for all controllers to stop.
for _, ct := range vde.controllers {
ct.Stop()
}
vde.controllers = make(map[int64]*controller)
vde.resetControllers()

// Wait for long-running functions to exit.
vde.wg.Wait()
Expand All @@ -232,14 +237,7 @@ func (vde *Engine) Close() {
log.Infof("VDiff Engine: closed")
}

func (vde *Engine) getDBClient(isAdmin bool) binlogplayer.DBClient {
if isAdmin {
return vde.dbClientFactoryDba()
}
return vde.dbClientFactoryFiltered()
}

func (vde *Engine) getPendingVDiffs(ctx context.Context) (*sqltypes.Result, error) {
func (vde *Engine) getVDiffsToRun(ctx context.Context) (*sqltypes.Result, error) {
dbClient := vde.dbClientFactoryFiltered()
if err := dbClient.Connect(); err != nil {
return nil, err
Expand All @@ -248,7 +246,7 @@ func (vde *Engine) getPendingVDiffs(ctx context.Context) (*sqltypes.Result, erro

// We have to use ExecIgnore here so as not to block quick tablet state
// transitions from primary to non-primary when starting the engine
qr, err := withDDL.ExecIgnore(ctx, sqlGetPendingVDiffs, dbClient.ExecuteFetch)
qr, err := withDDL.ExecIgnore(ctx, sqlGetVDiffsToRun, dbClient.ExecuteFetch)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -343,3 +341,10 @@ func (vde *Engine) retryErroredVDiffs() {
}
}
}

func (vde *Engine) resetControllers() {
for _, ct := range vde.controllers {
ct.Stop()
}
vde.controllers = make(map[int64]*controller)
}
Loading