Skip to content

Commit

Permalink
Add unit test for retry
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Nov 29, 2022
1 parent ebf5c15 commit 1b2ca7a
Showing 1 changed file with 80 additions and 10 deletions.
90 changes: 80 additions & 10 deletions go/vt/vttablet/tabletmanager/vdiff/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
Expand All @@ -34,9 +35,10 @@ import (
var (
wfName = "testwf"
optionsJS = `{"core_options": {"auto_retry": true}}`
vdiffTestCols = "id|vdiff_uuid|workflow|keyspace|shard|db_name|state|options"
vdiffTestColTypes = "int64|varchar|varbinary|varbinary|varchar|varbinary|varbinary|json"
vdiffTestCols = "id|vdiff_uuid|workflow|keyspace|shard|db_name|state|options|last_error"
vdiffTestColTypes = "int64|varchar|varbinary|varbinary|varchar|varbinary|varbinary|json|varbinary"
singleRowAffected = &sqltypes.Result{RowsAffected: 1}
noResults = &sqltypes.Result{}
testSchema = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Expand Down Expand Up @@ -94,7 +96,7 @@ func TestEngineOpen(t *testing.T) {
vdiffTestCols,
vdiffTestColTypes,
),
fmt.Sprintf("1|%s|%s|%s|%s|%s|%s|%s", UUID, wfName, env.KeyspaceName, env.ShardName, vdiffdb, tt.state, optionsJS),
fmt.Sprintf("1|%s|%s|%s|%s|%s|%s|%s|", UUID, wfName, env.KeyspaceName, env.ShardName, vdiffdb, tt.state, optionsJS),
)

dbClient.ExpectRequest("select * from _vt.vdiff where state in ('started','pending')", initialQR, nil)
Expand All @@ -103,7 +105,7 @@ func TestEngineOpen(t *testing.T) {
vdiffTestCols,
vdiffTestColTypes,
),
fmt.Sprintf("1|%s|%s|%s|%s|%s|pending|%s", UUID, wfName, env.KeyspaceName, env.ShardName, vdiffdb, optionsJS),
fmt.Sprintf("1|%s|%s|%s|%s|%s|pending|%s|", UUID, wfName, env.KeyspaceName, env.ShardName, vdiffdb, optionsJS),
), nil)

dbClient.ExpectRequest(fmt.Sprintf("select * from _vt.vreplication where workflow = '%s' and db_name = '%s'", wfName, vdiffdb), sqltypes.MakeTestResult(sqltypes.MakeTestFields(
Expand Down Expand Up @@ -154,31 +156,99 @@ func TestEngineOpen(t *testing.T) {
}

func TestEngineRetryErroredVDiffs(t *testing.T) {
type fields struct {
}
UUID := uuid.New().String()
source := `keyspace:"testsrc" shard:"0" filter:{rules:{match:"t1" filter:"select * from t1"}}`
expectedControllerCnt := 0
tests := []struct {
name string
fields fields
name string
retryQueryResults *sqltypes.Result
expectRetry bool
}{
// TODO: Add test cases.
{
name: "nothing to retry",
retryQueryResults: noResults,
},
{
name: "non-ephemeral error",
retryQueryResults: sqltypes.MakeTestResult(sqltypes.MakeTestFields(
vdiffTestCols,
vdiffTestColTypes,
),
fmt.Sprintf("1|%s|%s|%s|%s|%s|error|%s|%v", UUID, wfName, env.KeyspaceName, env.ShardName, vdiffdb, optionsJS,
mysql.NewSQLError(mysql.ERNoSuchTable, "42S02", "Table 'foo' doesn't exist")),
),
},
{
name: "ephemeral error to retry",
retryQueryResults: sqltypes.MakeTestResult(sqltypes.MakeTestFields(
vdiffTestCols,
vdiffTestColTypes,
),
fmt.Sprintf("1|%s|%s|%s|%s|%s|pending|%s|Query execution was interrupted, maximum statement execution time exceeded (errno 3024) (sqlstate HY000) during query: select * from foo", UUID, wfName, env.KeyspaceName, env.ShardName, vdiffdb, optionsJS),
),
expectRetry: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tablet := addTablet(100)
tablet.Type = topodatapb.TabletType_PRIMARY
defer deleteTablet(tablet)
resetBinlogClient()
dbClient := binlogplayer.NewMockDBClient(t)
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
tmc := newFakeTMClient()
tmc.schema = testSchema
vde := &Engine{
controllers: make(map[int64]*controller),
ts: env.TopoServ,
thisTablet: tablet,
dbClientFactoryFiltered: dbClientFactory,
dbClientFactoryDba: dbClientFactory,
dbName: vdiffdb,
testingTMC: tmc,
}
require.False(t, vde.IsOpen())

dbClient.ExpectRequest("select * from _vt.vdiff where state in ('started','pending')", noResults, nil)
vde.Open(context.Background(), vreplEngine)
defer vde.Close()
vde.retryErroredVDiffs()
assert.True(t, vde.IsOpen())
assert.Equal(t, 0, len(vde.controllers))

dbClient.ExpectRequest("select * from _vt.vdiff where state = 'error' and options->>'$.core_options.auto_retry' = 'true'", tt.retryQueryResults, nil)
for _, row := range tt.retryQueryResults.Rows {
id := row[0].ToString()
if tt.expectRetry {
dbClient.ExpectRequestRE("update _vt.vdiff as vd left join _vt.vdiff_table as vdt on \\(vd.id = vdt.vdiff_id\\) set vd.state = 'pending'.*", singleRowAffected, nil)
dbClient.ExpectRequest(fmt.Sprintf("select * from _vt.vdiff where id = %s", id), sqltypes.MakeTestResult(sqltypes.MakeTestFields(
vdiffTestCols,
vdiffTestColTypes,
),
fmt.Sprintf("1|%s|%s|%s|%s|%s|pending|%s|", UUID, wfName, env.KeyspaceName, env.ShardName, vdiffdb, optionsJS),
), nil)
dbClient.ExpectRequest(fmt.Sprintf("select * from _vt.vreplication where workflow = '%s' and db_name = '%s'", wfName, vdiffdb), sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|workflow|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type",
"int64|varbinary|blob|varbinary|varbinary|int64|int64|varbinary|varbinary|int64|int64|varbinary|varbinary|varbinary|int64|varbinary|int64|int64|int64|varchar|int64",
),
fmt.Sprintf("%s|%s|%s|MySQL56/f69ed286-6909-11ed-8342-0a50724f3211:1-110||9223372036854775807|9223372036854775807||PRIMARY,REPLICA|1669511347|0|Running||%s|200||1669511347|1|0||1", id, wfName, source, vdiffdb),
), nil)

// At this point we know that we kicked off the expected retry so we can short circit the vdiff.
dbClient.ExpectRequest(fmt.Sprintf("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = %s", id), nil, fmt.Errorf("This is far enough, yo"))
dbClient.ExpectRequest(fmt.Sprintf("insert into _vt.vdiff_log(vdiff_id, message) values (%s, 'Error: This is far enough, yo')", id), singleRowAffected, nil)
dbClient.ExpectRequest(fmt.Sprintf("update _vt.vdiff set state = 'error', last_error = 'This is far enough, yo' where id = %s", id), singleRowAffected, nil)
dbClient.ExpectRequest(fmt.Sprintf("insert into _vt.vdiff_log(vdiff_id, message) values (%s, 'State changed to: error')", id), singleRowAffected, nil)

expectedControllerCnt++
}
}
err := vde.retryVDiffs(vde.ctx)
assert.NoError(t, err)
assert.Equal(t, expectedControllerCnt, len(vde.controllers))

dbClient.Wait()
})
}

}

0 comments on commit 1b2ca7a

Please sign in to comment.