Skip to content

Commit

Permalink
Fail VReplication workflows on errors that persist and unrecoverable …
Browse files Browse the repository at this point in the history
…errors (vitessio#10429) (vitessio#783)

* Fail workflow if same error persists too long. Fail for unrecoverable errors also in non-online ddl workflows

Signed-off-by: Rohit Nayak <rohit@planetscale.com>

* Update max time default to 15m, was 1m for testing purposes

Signed-off-by: Rohit Nayak <rohit@planetscale.com>

* Leverage vterrors for Equals; attempt to address my own nits

Signed-off-by: Matt Lord <mattalord@gmail.com>

* sanity: validate range of vreplication_retry_delay and of vreplication_max_time_to_retry_on_error

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* Fix flags test

Signed-off-by: Rohit Nayak <rohit@planetscale.com>

* Remove leftover log.Flush()

Signed-off-by: Rohit Nayak <rohit@planetscale.com>

* Revert validations min/max settings on retry delay since it is breaking unit tests that set the value to a very small value

Signed-off-by: Rohit Nayak <rohit@planetscale.com>

* captilize per request

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

Co-authored-by: Matt Lord <mattalord@gmail.com>
Co-authored-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

Co-authored-by: Rohit Nayak <57520317+rohit-nayak-ps@users.noreply.github.com>
Co-authored-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
3 people authored Jun 23, 2022
1 parent 53e1b7a commit 71c61dc
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 25 deletions.
50 changes: 28 additions & 22 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ var (
_ = flag.Duration("vreplication_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology")
_ = flag.Duration("vreplication_healthcheck_retry_delay", 5*time.Second, "healthcheck retry delay")
_ = flag.Duration("vreplication_healthcheck_timeout", 1*time.Minute, "healthcheck retry delay")
retryDelay = flag.Duration("vreplication_retry_delay", 5*time.Second, "delay before retrying a failed binlog connection")
retryDelay = flag.Duration("vreplication_retry_delay", 5*time.Second, "delay before retrying a failed workflow event in the replication phase")

maxTimeToRetryError = flag.Duration("vreplication_max_time_to_retry_on_error", 15*time.Minute, "stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence")
)

// controller is created by Engine. Members are initialized upfront.
Expand All @@ -69,6 +71,8 @@ type controller struct {

// The following fields are updated after start. So, they need synchronization.
sourceTablet sync2.AtomicString

lastWorkflowError *lastError
}

// newController creates a new controller. Unless a stream is explicitly 'Stopped',
Expand All @@ -77,13 +81,15 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
if blpStats == nil {
blpStats = binlogplayer.NewStats()
}

ct := &controller{
vre: vre,
dbClientFactory: dbClientFactory,
mysqld: mysqld,
blpStats: blpStats,
done: make(chan struct{}),
source: &binlogdatapb.BinlogSource{},
vre: vre,
dbClientFactory: dbClientFactory,
mysqld: mysqld,
blpStats: blpStats,
done: make(chan struct{}),
source: &binlogdatapb.BinlogSource{},
lastWorkflowError: newLastError("VReplication Controller", *maxTimeToRetryError),
}
log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params)

Expand All @@ -95,9 +101,10 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
ct.id = uint32(id)
ct.workflow = params["workflow"]

blpStats.State.Set(params["state"])
// Nothing to do if replication is stopped.
if params["state"] == binlogplayer.BlpStopped {
state := params["state"]
blpStats.State.Set(state)
// Nothing to do if replication is stopped or is known to have an unrecoverable error.
if state == binlogplayer.BlpStopped || state == binlogplayer.BlpError {
ct.cancel = func() {}
close(ct.done)
return ct, nil
Expand Down Expand Up @@ -161,8 +168,9 @@ func (ct *controller) run(ctx context.Context) {
return
default:
}
binlogplayer.LogError(fmt.Sprintf("error in stream %v, retrying after %v", ct.id, *retryDelay), err)

ct.blpStats.ErrorCounts.Add([]string{"Stream Error"}, 1)
binlogplayer.LogError(fmt.Sprintf("error in stream %v, retrying after %v", ct.id, *retryDelay), err)
timer := time.NewTimer(*retryDelay)
select {
case <-ctx.Done():
Expand Down Expand Up @@ -270,18 +278,16 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {

vr := newVReplicator(ct.id, ct.source, vsClient, ct.blpStats, dbClient, ct.mysqld, ct.vre)
err = vr.Replicate(ctx)
if isUnrecoverableError(err) {
settings, _, errSetting := vr.readSettings(ctx)
if errSetting != nil {
return err // yes, err and not errSetting.
}
if settings.WorkflowType == int64(binlogdatapb.VReplicationWorkflowType_ONLINEDDL) {
// Specific to OnlineDDL, if we encounter an "unrecoverable error", we change the migration state into Error and then we quit the workflow
if errSetState := vr.setState(binlogplayer.BlpError, err.Error()); errSetState != nil {
return err // yes, err and not errSetState.
}
return nil // this will cause vreplicate to quit the workflow

ct.lastWorkflowError.record(err)
// If this is a mysql error that we know needs manual intervention OR
// we cannot identify this as non-recoverable, but it has persisted beyond the retry limit (maxTimeToRetryError)
if isUnrecoverableError(err) || !ct.lastWorkflowError.shouldRetry() {
log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err)
if errSetState := vr.setState(binlogplayer.BlpError, err.Error()); errSetState != nil {
return err // yes, err and not errSetState.
}
return nil // this will cause vreplicate to quit the workflow
}
return err
}
Expand Down
71 changes: 71 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/last_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vreplication

import (
"sync"
"time"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vterrors"
)

/*
* lastError tracks the most recent error for any ongoing process and how long it has persisted.
* The err field should be a vterror so as to ensure we have meaningful error codes, causes, stack
* traces, etc.
*/
type lastError struct {
name string
err error
firstSeen time.Time
mu sync.Mutex
maxTimeInError time.Duration // if error persists for this long, shouldRetry() will return false
}

func newLastError(name string, maxTimeInError time.Duration) *lastError {
return &lastError{
name: name,
maxTimeInError: maxTimeInError,
}
}

func (le *lastError) record(err error) {
le.mu.Lock()
defer le.mu.Unlock()
if err == nil {
le.err = nil
le.firstSeen = time.Time{}
return
}
if !vterrors.Equals(err, le.err) {
le.firstSeen = time.Now()
le.err = err
}
// The error is unchanged so we don't need to do anything
}

func (le *lastError) shouldRetry() bool {
le.mu.Lock()
defer le.mu.Unlock()
if !le.firstSeen.IsZero() && time.Since(le.firstSeen) > le.maxTimeInError {
log.Errorf("VReplication encountered the same error continuously since %s, we will assume this is a non-recoverable error and will not retry anymore; the workflow will need to be manually restarted once error '%s' has been addressed",
le.firstSeen.UTC(), le.err)
return false
}
return true
}
55 changes: 55 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/last_error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vreplication

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestLastError(t *testing.T) {
le := newLastError("test", 100*time.Millisecond)

t.Run("long running error", func(t *testing.T) {
err1 := fmt.Errorf("test1")
le.record(err1)
require.True(t, le.shouldRetry())
time.Sleep(150 * time.Millisecond)
require.False(t, le.shouldRetry())
})

t.Run("new long running error", func(t *testing.T) {
err2 := fmt.Errorf("test2")
le.record(err2)
require.True(t, le.shouldRetry())
for i := 1; i < 10; i++ {
le.record(err2)
}
require.True(t, le.shouldRetry())
time.Sleep(150 * time.Millisecond)
le.record(err2)
require.False(t, le.shouldRetry())
})

t.Run("no error", func(t *testing.T) {
le.record(nil)
require.True(t, le.shouldRetry())
})
}
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"strconv"

"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/sqlparser"

Expand Down Expand Up @@ -161,6 +163,7 @@ func isUnrecoverableError(err error) bool {
mysql.ERInvalidCastToJSON,
mysql.ERJSONValueTooBig,
mysql.ERJSONDocumentTooDeep:
log.Errorf("Got unrecoverable error: %v", sqlErr)
return true
}
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2374,7 +2374,6 @@ func TestRestartOnVStreamEnd(t *testing.T) {
"/update _vt.vreplication set message='vstream ended'",
})
streamerEngine.Open()

execStatements(t, []string{
"insert into t1 values(2, 'aaa')",
})
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ func (vr *vreplicator) replicate(ctx context.Context) error {
if err != nil {
return err
}
// If any of the operations below changed state to Stopped, we should return.
if settings.State == binlogplayer.BlpStopped {
// If any of the operations below changed state to Stopped or Error, we should return.
if settings.State == binlogplayer.BlpStopped || settings.State == binlogplayer.BlpError {
return nil
}
switch {
Expand Down

0 comments on commit 71c61dc

Please sign in to comment.