diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 563a1fb2f06..6b9a2c4ddc8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -46,7 +46,9 @@ var ( _ = flag.Duration("vreplication_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology") _ = flag.Duration("vreplication_healthcheck_retry_delay", 5*time.Second, "healthcheck retry delay") _ = flag.Duration("vreplication_healthcheck_timeout", 1*time.Minute, "healthcheck retry delay") - retryDelay = flag.Duration("vreplication_retry_delay", 5*time.Second, "delay before retrying a failed binlog connection") + retryDelay = flag.Duration("vreplication_retry_delay", 5*time.Second, "delay before retrying a failed workflow event in the replication phase") + + maxTimeToRetryError = flag.Duration("vreplication_max_time_to_retry_on_error", 15*time.Minute, "stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence") ) // controller is created by Engine. Members are initialized upfront. @@ -69,6 +71,8 @@ type controller struct { // The following fields are updated after start. So, they need synchronization. sourceTablet sync2.AtomicString + + lastWorkflowError *lastError } // newController creates a new controller. Unless a stream is explicitly 'Stopped', @@ -77,13 +81,15 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor if blpStats == nil { blpStats = binlogplayer.NewStats() } + ct := &controller{ - vre: vre, - dbClientFactory: dbClientFactory, - mysqld: mysqld, - blpStats: blpStats, - done: make(chan struct{}), - source: &binlogdatapb.BinlogSource{}, + vre: vre, + dbClientFactory: dbClientFactory, + mysqld: mysqld, + blpStats: blpStats, + done: make(chan struct{}), + source: &binlogdatapb.BinlogSource{}, + lastWorkflowError: newLastError("VReplication Controller", *maxTimeToRetryError), } log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params) @@ -95,9 +101,10 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor ct.id = uint32(id) ct.workflow = params["workflow"] - blpStats.State.Set(params["state"]) - // Nothing to do if replication is stopped. - if params["state"] == binlogplayer.BlpStopped { + state := params["state"] + blpStats.State.Set(state) + // Nothing to do if replication is stopped or is known to have an unrecoverable error. + if state == binlogplayer.BlpStopped || state == binlogplayer.BlpError { ct.cancel = func() {} close(ct.done) return ct, nil @@ -161,8 +168,9 @@ func (ct *controller) run(ctx context.Context) { return default: } - binlogplayer.LogError(fmt.Sprintf("error in stream %v, retrying after %v", ct.id, *retryDelay), err) + ct.blpStats.ErrorCounts.Add([]string{"Stream Error"}, 1) + binlogplayer.LogError(fmt.Sprintf("error in stream %v, retrying after %v", ct.id, *retryDelay), err) timer := time.NewTimer(*retryDelay) select { case <-ctx.Done(): @@ -270,18 +278,16 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { vr := newVReplicator(ct.id, ct.source, vsClient, ct.blpStats, dbClient, ct.mysqld, ct.vre) err = vr.Replicate(ctx) - if isUnrecoverableError(err) { - settings, _, errSetting := vr.readSettings(ctx) - if errSetting != nil { - return err // yes, err and not errSetting. - } - if settings.WorkflowType == int64(binlogdatapb.VReplicationWorkflowType_ONLINEDDL) { - // Specific to OnlineDDL, if we encounter an "unrecoverable error", we change the migration state into Error and then we quit the workflow - if errSetState := vr.setState(binlogplayer.BlpError, err.Error()); errSetState != nil { - return err // yes, err and not errSetState. - } - return nil // this will cause vreplicate to quit the workflow + + ct.lastWorkflowError.record(err) + // If this is a mysql error that we know needs manual intervention OR + // we cannot identify this as non-recoverable, but it has persisted beyond the retry limit (maxTimeToRetryError) + if isUnrecoverableError(err) || !ct.lastWorkflowError.shouldRetry() { + log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err) + if errSetState := vr.setState(binlogplayer.BlpError, err.Error()); errSetState != nil { + return err // yes, err and not errSetState. } + return nil // this will cause vreplicate to quit the workflow } return err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/last_error.go b/go/vt/vttablet/tabletmanager/vreplication/last_error.go new file mode 100644 index 00000000000..a3080973c92 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/last_error.go @@ -0,0 +1,71 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "sync" + "time" + + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vterrors" +) + +/* + * lastError tracks the most recent error for any ongoing process and how long it has persisted. + * The err field should be a vterror so as to ensure we have meaningful error codes, causes, stack + * traces, etc. + */ +type lastError struct { + name string + err error + firstSeen time.Time + mu sync.Mutex + maxTimeInError time.Duration // if error persists for this long, shouldRetry() will return false +} + +func newLastError(name string, maxTimeInError time.Duration) *lastError { + return &lastError{ + name: name, + maxTimeInError: maxTimeInError, + } +} + +func (le *lastError) record(err error) { + le.mu.Lock() + defer le.mu.Unlock() + if err == nil { + le.err = nil + le.firstSeen = time.Time{} + return + } + if !vterrors.Equals(err, le.err) { + le.firstSeen = time.Now() + le.err = err + } + // The error is unchanged so we don't need to do anything +} + +func (le *lastError) shouldRetry() bool { + le.mu.Lock() + defer le.mu.Unlock() + if !le.firstSeen.IsZero() && time.Since(le.firstSeen) > le.maxTimeInError { + log.Errorf("VReplication encountered the same error continuously since %s, we will assume this is a non-recoverable error and will not retry anymore; the workflow will need to be manually restarted once error '%s' has been addressed", + le.firstSeen.UTC(), le.err) + return false + } + return true +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/last_error_test.go b/go/vt/vttablet/tabletmanager/vreplication/last_error_test.go new file mode 100644 index 00000000000..8d0e353478a --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/last_error_test.go @@ -0,0 +1,55 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestLastError(t *testing.T) { + le := newLastError("test", 100*time.Millisecond) + + t.Run("long running error", func(t *testing.T) { + err1 := fmt.Errorf("test1") + le.record(err1) + require.True(t, le.shouldRetry()) + time.Sleep(150 * time.Millisecond) + require.False(t, le.shouldRetry()) + }) + + t.Run("new long running error", func(t *testing.T) { + err2 := fmt.Errorf("test2") + le.record(err2) + require.True(t, le.shouldRetry()) + for i := 1; i < 10; i++ { + le.record(err2) + } + require.True(t, le.shouldRetry()) + time.Sleep(150 * time.Millisecond) + le.record(err2) + require.False(t, le.shouldRetry()) + }) + + t.Run("no error", func(t *testing.T) { + le.record(nil) + require.True(t, le.shouldRetry()) + }) +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/utils.go b/go/vt/vttablet/tabletmanager/vreplication/utils.go index 4f433872e98..2a689c9b2ec 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/utils.go +++ b/go/vt/vttablet/tabletmanager/vreplication/utils.go @@ -22,6 +22,8 @@ import ( "fmt" "strconv" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/sqlparser" @@ -161,6 +163,7 @@ func isUnrecoverableError(err error) bool { mysql.ERInvalidCastToJSON, mysql.ERJSONValueTooBig, mysql.ERJSONDocumentTooDeep: + log.Errorf("Got unrecoverable error: %v", sqlErr) return true } return false diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 99a8fb168d0..988f9c6237c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -2374,7 +2374,6 @@ func TestRestartOnVStreamEnd(t *testing.T) { "/update _vt.vreplication set message='vstream ended'", }) streamerEngine.Open() - execStatements(t, []string{ "insert into t1 values(2, 'aaa')", }) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index a1252b9d603..c725be8d02b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -210,8 +210,8 @@ func (vr *vreplicator) replicate(ctx context.Context) error { if err != nil { return err } - // If any of the operations below changed state to Stopped, we should return. - if settings.State == binlogplayer.BlpStopped { + // If any of the operations below changed state to Stopped or Error, we should return. + if settings.State == binlogplayer.BlpStopped || settings.State == binlogplayer.BlpError { return nil } switch {