Skip to content

Commit

Permalink
CherryPick(#15797): VReplication: Improve handling of vplayer stalls (#…
Browse files Browse the repository at this point in the history
…5383)

* cherry pick of 15797

* Extend unit race test timeout too

Signed-off-by: Matt Lord <mattalord@gmail.com>

---------

Signed-off-by: Matt Lord <mattalord@gmail.com>
Co-authored-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
planetscale-actions-bot and mattlord authored Jun 14, 2024
1 parent 98d872e commit 400fd7e
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 62 deletions.
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var (
"--buffer_size", "250000", "--buffer_min_time_between_failovers", "1s", "--buffer_max_failover_duration", loadTestBufferingWindowDuration.String(),
"--buffer_drain_concurrency", "10"}
extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"}
// This variable can be used within specific tests to alter vttablet behavior
// This variable can be used within specific tests to alter vttablet behavior.
extraVTTabletArgs = []string{}

parallelInsertWorkers = "--vreplication-parallel-insert-workers=4"
Expand Down
18 changes: 9 additions & 9 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/sqlparser"

_flag "vitess.io/vitess/go/internal/flag"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
Expand All @@ -46,6 +44,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sidecardb"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet"
"vitess.io/vitess/go/vt/vttablet/queryservice"
Expand Down Expand Up @@ -74,6 +73,7 @@ var (
testForeignKeyQueries = false
testSetForeignKeyQueries = false
doNotLogDBQueries = false
recvTimeout = 5 * time.Second
)

type LogExpectation struct {
Expand Down Expand Up @@ -492,14 +492,14 @@ func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Resu
return qr, err
}

func (dc *realDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
func (dbc *realDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
queries, err := sqlparser.NewTestParser().SplitStatementToPieces(query)
if err != nil {
return nil, err
}
results := make([]*sqltypes.Result, 0, len(queries))
for _, query := range queries {
qr, err := dc.ExecuteFetch(query, maxrows)
qr, err := dbc.ExecuteFetch(query, maxrows)
if err != nil {
return nil, err
}
Expand All @@ -518,7 +518,7 @@ func expectDeleteQueries(t *testing.T) {
"/delete from _vt.vreplication",
"/delete from _vt.copy_state",
"/delete from _vt.post_copy_action",
))
), recvTimeout)
}

func deleteAllVReplicationStreams(t *testing.T) {
Expand Down Expand Up @@ -635,7 +635,7 @@ func expectDBClientQueries(t *testing.T, expectations qh.ExpectationSequence, sk
))
}
case <-time.After(5 * time.Second):
t.Fatalf("no query received")
require.FailNow(t, "no query received")
failed = true
}
}
Expand All @@ -656,7 +656,7 @@ func expectDBClientQueries(t *testing.T, expectations qh.ExpectationSequence, sk

// expectNontxQueries disregards transactional statements like begin and commit.
// It also disregards updates to _vt.vreplication.
func expectNontxQueries(t *testing.T, expectations qh.ExpectationSequence) {
func expectNontxQueries(t *testing.T, expectations qh.ExpectationSequence, recvTimeout time.Duration) {
t.Helper()
if doNotLogDBQueries {
return
Expand Down Expand Up @@ -684,8 +684,8 @@ func expectNontxQueries(t *testing.T, expectations qh.ExpectationSequence) {
"query:%q\nmessage:%s\nexpectation:%s\nmatched:%t\nerror:%v\nhistory:%s",
got, result.Message, result.Expectation, result.Matched, result.Error, validator.History(),
))
case <-time.After(5 * time.Second):
t.Fatalf("no query received")
case <-time.After(recvTimeout):
require.FailNow(t, "no query received")
failed = true
}
}
Expand Down
41 changes: 38 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/relaylog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ limitations under the License.
package vreplication

import (
"context"
"io"
"sync"
"time"

"context"
"vitess.io/vitess/go/vt/vterrors"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

const relayLogIOStalledMsg = "relay log I/O stalled"

type relayLog struct {
ctx context.Context
maxItems int
Expand Down Expand Up @@ -72,12 +75,18 @@ func (rl *relayLog) Send(events []*binlogdatapb.VEvent) error {
if err := rl.checkDone(); err != nil {
return err
}
cancelTimer := rl.startSendTimer()
defer cancelTimer()
for rl.curSize > rl.maxSize || len(rl.items) >= rl.maxItems {
rl.canAccept.Wait()
if rl.timedout {
return vterrors.Wrap(errVPlayerStalled, relayLogIOStalledMsg)
}
if err := rl.checkDone(); err != nil {
return err
}
}
rl.timedout = false
rl.items = append(rl.items, events)
rl.curSize += eventsSize(events)
rl.hasItems.Broadcast()
Expand All @@ -92,7 +101,7 @@ func (rl *relayLog) Fetch() ([][]*binlogdatapb.VEvent, error) {
if err := rl.checkDone(); err != nil {
return nil, err
}
cancelTimer := rl.startTimer()
cancelTimer := rl.startFetchTimer()
defer cancelTimer()
for len(rl.items) == 0 && !rl.timedout {
rl.hasItems.Wait()
Expand All @@ -117,7 +126,33 @@ func (rl *relayLog) checkDone() error {
return nil
}

func (rl *relayLog) startTimer() (cancel func()) {
// startSendTimer starts a timer that will wake up the sender if we hit
// the vplayerProgressDeadline timeout. This ensures that we don't
// block forever if the vplayer cannot process the previous relay log
// contents in a timely manner; allowing us to provide the user with a
// helpful error message.
func (rl *relayLog) startSendTimer() (cancel func()) {
timer := time.NewTimer(vplayerProgressDeadline)
timerDone := make(chan struct{})
go func() {
select {
case <-timer.C:
rl.mu.Lock()
defer rl.mu.Unlock()
rl.timedout = true
rl.canAccept.Broadcast()
case <-timerDone:
}
}()
return func() {
timer.Stop()
close(timerDone)
}
}

// startFetchTimer starts a timer that will wake up the fetcher after
// idleTimeout to be sure that we're regularly checking for new events.
func (rl *relayLog) startFetchTimer() (cancel func()) {
timer := time.NewTimer(idleTimeout)
timerDone := make(chan struct{})
go func() {
Expand Down
20 changes: 10 additions & 10 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func testPlayerCopyCharPK(t *testing.T) {
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:BINARY charset:63 flags:20611} rows:{lengths:2 values:\\"c\\\\x00\\"}'.*`,
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst",
"/update _vt.vreplication set state='Running",
))
), recvTimeout)

expectData(t, "dst", [][]string{
{"a\000", "3"},
Expand Down Expand Up @@ -304,7 +304,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
}).Then(qh.Immediately(
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst",
"/update _vt.vreplication set state='Running'",
)))
)), recvTimeout)

expectData(t, "dst", [][]string{
{"a", "1"},
Expand Down Expand Up @@ -415,7 +415,7 @@ func testPlayerCopyVarcharCompositePKCaseSensitiveCollation(t *testing.T) {
// Wrap-up.
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst",
"/update _vt.vreplication set state='Running'",
))
), recvTimeout)

expectData(t, "dst", [][]string{
{"1", "B", "B", "3"},
Expand Down Expand Up @@ -790,7 +790,7 @@ func testPlayerCopyBigTable(t *testing.T) {
// Copy is done. Go into running state.
// All tables copied. Final catch up followed by Running state.
"/update _vt.vreplication set state='Running'",
)))
)), recvTimeout)

expectData(t, "dst", [][]string{
{"1", "aaa"},
Expand Down Expand Up @@ -918,7 +918,7 @@ func testPlayerCopyWildcardRule(t *testing.T) {
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*src",
// Copy is done. Go into running state.
"/update _vt.vreplication set state='Running'",
)))
)), recvTimeout)

expectData(t, "src", [][]string{
{"1", "aaa"},
Expand Down Expand Up @@ -1078,7 +1078,7 @@ func testPlayerCopyTableContinuation(t *testing.T) {
)).Then(qh.Immediately(
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*not_copied",
"/update _vt.vreplication set state='Running'",
)))
)), recvTimeout)

expectData(t, "dst1", [][]string{
{"1", "insert in"},
Expand Down Expand Up @@ -1188,7 +1188,7 @@ func testPlayerCopyWildcardTableContinuation(t *testing.T) {
`/insert into _vt.copy_state .*`,
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst",
"/update _vt.vreplication set state='Running'",
)))
)), recvTimeout)

expectData(t, "dst", [][]string{
{"2", "copied"},
Expand Down Expand Up @@ -1279,7 +1279,7 @@ func TestPlayerCopyWildcardTableContinuationWithOptimizeInserts(t *testing.T) {
`/insert into _vt.copy_state .*`,
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst",
"/update _vt.vreplication set state='Running'",
))
), recvTimeout)
expectData(t, "dst", [][]string{
{"2", "copied"},
{"3", "uncopied"},
Expand Down Expand Up @@ -1659,7 +1659,7 @@ func testPlayerCopyTablesWithGeneratedColumn(t *testing.T) {
// copy of dst2 is done: delete from copy_state.
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst2",
"/update _vt.vreplication set state",
))
), recvTimeout)

expectData(t, "dst1", [][]string{
{"1", "aaa", "1aaa", "aaa1", "10"},
Expand Down Expand Up @@ -1826,7 +1826,7 @@ func testCopyInvisibleColumns(t *testing.T) {
// copy of dst1 is done: delete from copy_state.
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst1",
"/update _vt.vreplication set state='Running'",
))
), recvTimeout)

expectData(t, "dst1", [][]string{
{"1", "10"},
Expand Down
44 changes: 39 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,25 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

const failedToRecordHeartbeatMsg = "failed to record heartbeat"

var (
// At what point should we consider the vplayer to be stalled and return an error.
// 5 minutes is well beyond a reasonable amount of time for a transaction to be
// replicated.
vplayerProgressDeadline = time.Duration(5 * time.Minute)

// The error to return when we have detected a stall in the vplayer.
errVPlayerStalled = errors.New("progress stalled; vplayer was unable to replicate the transaction in a timely manner; examine the target mysqld instance health and the replicated queries' EXPLAIN output to see why queries are taking unusually long")
)

// vplayer replays binlog events by pulling them from a vstreamer.
type vplayer struct {
vr *vreplicator
Expand Down Expand Up @@ -367,12 +380,13 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row
return nil
}

// updatePos should get called at a minimum of vreplicationMinimumHeartbeatUpdateInterval.
func (vp *vplayer) updatePos(ctx context.Context, ts int64) (posReached bool, err error) {
vp.numAccumulatedHeartbeats = 0
update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts, vp.vr.stats.CopyRowCount.Get(), vreplicationStoreCompressedGTID)
if _, err := vp.query(ctx, update); err != nil {
return false, fmt.Errorf("error %v updating position", err)
}
vp.numAccumulatedHeartbeats = 0
vp.unsavedEvent = nil
vp.timeLastSaved = time.Now()
vp.vr.stats.SetLastPosition(vp.pos)
Expand All @@ -399,8 +413,16 @@ func (vp *vplayer) recordHeartbeat() error {
if !vp.mustUpdateHeartbeat() {
return nil
}
if err := vp.vr.updateHeartbeatTime(tm); err != nil {
return vterrors.Wrapf(errVPlayerStalled, fmt.Sprintf("%s: %v", failedToRecordHeartbeatMsg, err))
}
// Only reset the pending heartbeat count if the update was successful.
// Otherwise the vplayer may not actually be making progress and nobody
// is aware of it -- resulting in the com_binlog_dump connection on the
// source that is managed by the binlog_player getting closed by mysqld
// when the source_net_timeout is hit.
vp.numAccumulatedHeartbeats = 0
return vp.vr.updateHeartbeatTime(tm)
return nil
}

// applyEvents is the main thread that applies the events. It has the following use
Expand Down Expand Up @@ -438,7 +460,7 @@ func (vp *vplayer) recordHeartbeat() error {
// current position to be saved.
//
// In order to handle the above use cases, we use an implicit transaction scheme:
// A BEGIN does not really start a transaction. Ony a ROW event does. With this
// A BEGIN does not really start a transaction. Only a ROW event does. With this
// approach, no transaction gets started if an empty one arrives. If a we receive
// a commit, and a we are not in a transaction, we infer that the transaction was
// empty, and remember it as an unsaved event. The next GTID event will reset the
Expand Down Expand Up @@ -497,6 +519,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
return nil
}
}

for i, events := range items {
for j, event := range events {
if event.Timestamp != 0 {
Expand Down Expand Up @@ -526,7 +549,17 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
if err := vp.applyEvent(ctx, event, mustSave); err != nil {
if err != io.EOF {
vp.vr.stats.ErrorCounts.Add([]string{"Apply"}, 1)
log.Errorf("Error applying event: %s", err.Error())
var table, tableLogMsg string
switch {
case event.GetFieldEvent() != nil:
table = event.GetFieldEvent().TableName
case event.GetRowEvent() != nil:
table = event.GetRowEvent().TableName
}
if table != "" {
tableLogMsg = fmt.Sprintf(" for table %s", table)
}
log.Errorf("Error applying event%s: %s", tableLogMsg, err.Error())
}
return err
}
Expand Down Expand Up @@ -635,7 +668,8 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
log.Infof("Error applying row event: %s", err.Error())
return err
}
//Row event is logged AFTER RowChanges are applied so as to calculate the total elapsed time for the Row event
// Row event is logged AFTER RowChanges are applied so as to calculate the total elapsed
// time for the Row event.
stats.Send(fmt.Sprintf("%v", event.RowEvent))
case binlogdatapb.VEventType_OTHER:
if vp.vr.dbClient.InTransaction {
Expand Down
Loading

0 comments on commit 400fd7e

Please sign in to comment.