Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: tracking rows_copied #7980

Merged
merged 9 commits into from
Apr 29, 2021
18 changes: 12 additions & 6 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func (blp *BinlogPlayer) writeRecoveryPosition(tx *binlogdatapb.BinlogTransactio
}

now := time.Now().Unix()
updateRecovery := GenerateUpdatePos(blp.uid, position, now, tx.EventToken.Timestamp)
updateRecovery := GenerateUpdatePos(blp.uid, position, now, tx.EventToken.Timestamp, blp.blplStats.CopyRowCount.Get())

qr, err := blp.exec(updateRecovery)
if err != nil {
Expand Down Expand Up @@ -554,6 +554,12 @@ var AlterVReplicationTable = []string{
"ALTER TABLE _vt.vreplication ADD COLUMN db_name VARBINARY(255) NOT NULL",
"ALTER TABLE _vt.vreplication MODIFY source BLOB NOT NULL",
"ALTER TABLE _vt.vreplication ADD KEY workflow_idx (workflow(64))",
"ALTER TABLE _vt.vreplication ADD COLUMN rows_copied BIGINT(20) NOT NULL DEFAULT 0",
}

var WithDDLInitialQueries = []string{
"SELECT db_name FROM _vt.vreplication LIMIT 0",
"SELECT rows_copied FROM _vt.vreplication LIMIT 0",
}

// VRSettings contains the settings of a vreplication table.
Expand Down Expand Up @@ -624,16 +630,16 @@ func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource,

// GenerateUpdatePos returns a statement to update a value in the
// _vt.vreplication table.
func GenerateUpdatePos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64) string {
func GenerateUpdatePos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64, rowsCopied int64) string {
if txTimestamp != 0 {
return fmt.Sprintf(
"update _vt.vreplication set pos=%v, time_updated=%v, transaction_timestamp=%v, message='' where id=%v",
encodeString(mysql.EncodePosition(pos)), timeUpdated, txTimestamp, uid)
"update _vt.vreplication set pos=%v, time_updated=%v, transaction_timestamp=%v, rows_copied=%v, message='' where id=%v",
encodeString(mysql.EncodePosition(pos)), timeUpdated, txTimestamp, rowsCopied, uid)
}

return fmt.Sprintf(
"update _vt.vreplication set pos=%v, time_updated=%v, message='' where id=%v",
encodeString(mysql.EncodePosition(pos)), timeUpdated, uid)
"update _vt.vreplication set pos=%v, time_updated=%v, rows_copied=%v, message='' where id=%v",
encodeString(mysql.EncodePosition(pos)), timeUpdated, rowsCopied, uid)
}

// GenerateUpdateTime returns a statement to update time_updated in the _vt.vreplication table.
Expand Down
8 changes: 4 additions & 4 deletions go/vt/binlog/binlogplayer/binlog_player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,10 @@ func TestCreateVReplicationTables(t *testing.T) {
func TestUpdateVReplicationPos(t *testing.T) {
gtid := mysql.MustParseGTID("MariaDB", "0-1-8283")
want := "update _vt.vreplication " +
"set pos='MariaDB/0-1-8283', time_updated=88822, message='' " +
"set pos='MariaDB/0-1-8283', time_updated=88822, rows_copied=0, message='' " +
"where id=78522"

got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 0)
got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 0, 0)
if got != want {
t.Errorf("updateVReplicationPos() = %#v, want %#v", got, want)
}
Expand All @@ -370,10 +370,10 @@ func TestUpdateVReplicationPos(t *testing.T) {
func TestUpdateVReplicationTimestamp(t *testing.T) {
gtid := mysql.MustParseGTID("MariaDB", "0-2-582")
want := "update _vt.vreplication " +
"set pos='MariaDB/0-2-582', time_updated=88822, transaction_timestamp=481828, message='' " +
"set pos='MariaDB/0-2-582', time_updated=88822, transaction_timestamp=481828, rows_copied=0, message='' " +
"where id=78522"

got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 481828)
got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 481828, 0)
if got != want {
t.Errorf("updateVReplicationPos() = %#v, want %#v", got, want)
}
Expand Down
48 changes: 36 additions & 12 deletions go/vt/binlog/binlogplayer/mock_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ const mockClientUNameDba = "Dba"
// MockDBClient mocks a DBClient.
// It must be configured to expect requests in a specific order.
type MockDBClient struct {
t *testing.T
UName string
expect []*mockExpect
currentResult int
done chan struct{}
invariants map[string]*sqltypes.Result
t *testing.T
UName string
expect []*mockExpect
currentResult int
done chan struct{}
queriesToIgnore []*mockExpect // these queries will return a standard nil result, you SHOULD NOT expect them in the tests
invariants map[string]*sqltypes.Result
}

type mockExpect struct {
Expand All @@ -46,12 +47,28 @@ type mockExpect struct {
err error
}

func getQueriesToIgnore() []*mockExpect {
var queriesToIgnore []*mockExpect
for _, query := range WithDDLInitialQueries {
exp := &mockExpect{
query: query,
re: nil,
result: &sqltypes.Result{},
err: nil,
}
queriesToIgnore = append(queriesToIgnore, exp)

}
return queriesToIgnore
}

// NewMockDBClient returns a new DBClientMock with the default "Filtered" UName.
func NewMockDBClient(t *testing.T) *MockDBClient {
return &MockDBClient{
t: t,
UName: mockClientUNameFiltered,
done: make(chan struct{}),
t: t,
UName: mockClientUNameFiltered,
done: make(chan struct{}),
queriesToIgnore: getQueriesToIgnore(),
invariants: map[string]*sqltypes.Result{
"CREATE TABLE IF NOT EXISTS _vt.vreplication_log": {},
"select id, type, state, message from _vt.vreplication_log": {},
Expand All @@ -60,11 +77,13 @@ func NewMockDBClient(t *testing.T) *MockDBClient {
}
}

// NewMockDbaClient returns a new DBClientMock with the default "Dba" UName.
func NewMockDbaClient(t *testing.T) *MockDBClient {
return &MockDBClient{
t: t,
UName: mockClientUNameDba,
done: make(chan struct{}),
t: t,
UName: mockClientUNameDba,
done: make(chan struct{}),
queriesToIgnore: getQueriesToIgnore(),
}
}

Expand Down Expand Up @@ -150,6 +169,11 @@ func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re
dc.t.Helper()
dc.t.Logf("DBClient query: %v", query)

for _, q := range dc.queriesToIgnore {
if strings.EqualFold(q.query, query) || strings.Contains(strings.ToLower(query), strings.ToLower(q.query)) {
return q.result, q.err
}
}
for q, result := range dc.invariants {
if strings.Contains(query, q) {
return result, nil
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ var ghostOverridePath = flag.String("gh-ost-path", "", "override default gh-ost
var ptOSCOverridePath = flag.String("pt-osc-path", "", "override default pt-online-schema-change binary full path")
var migrationCheckInterval = flag.Duration("migration_check_interval", 1*time.Minute, "Interval between migration checks")
var retainOnlineDDLTables = flag.Duration("retain_online_ddl_tables", 24*time.Hour, "How long should vttablet keep an old migrated table before purging it")
var migrationNextCheckIntervals = []time.Duration{1 * time.Second, 5 * time.Second}
var migrationNextCheckIntervals = []time.Duration{1 * time.Second, 5 * time.Second, 10 * time.Second, 20 * time.Second}

const (
maxPasswordLength = 32 // MySQL's *replication* password may not exceed 32 characters
Expand Down Expand Up @@ -1635,12 +1635,12 @@ func (e *Executor) evaluateDeclarativeDiff(ctx context.Context, onlineDDL *schem
// - what's the migration strategy?
// The function invokes the appropriate handlers for each of those cases.
func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) error {
defer e.triggerNextCheckInterval()
failMigration := func(err error) error {
_ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed)
if err != nil {
_ = e.updateMigrationMessage(ctx, onlineDDL.UUID, err.Error())
}
e.triggerNextCheckInterval()
return err
}

Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
if err := dbClient.Connect(); err != nil {
return vterrors.Wrap(err, "can't connect to database")
}
for _, query := range withDDLInitialQueries {
if _, err := withDDL.Exec(ctx, query, dbClient.ExecuteFetch); err != nil {
log.Errorf("cannot apply withDDL init query '%s': %v", query, err)
}
}
defer dbClient.Close()

var tablet *topodatapb.Tablet
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
)

var withDDL *withddl.WithDDL
var withDDLInitialQueries []string

const (
throttlerAppName = "vreplication"
Expand All @@ -74,6 +75,8 @@ func init() {
allddls = append(allddls, createReshardingJournalTable, createCopyState)
allddls = append(allddls, createVReplicationLog)
withDDL = withddl.New(allddls)

withDDLInitialQueries = append(withDDLInitialQueries, binlogplayer.WithDDLInitialQueries...)
}

// this are the default tablet_types that will be used by the tablet picker to find sources for a vreplication stream
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/vreplication/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ func TestCreateDBAndTable(t *testing.T) {
dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication ADD COLUMN db_name.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication MODIFY source.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication ADD KEY.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication ADD COLUMN rows_copied.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("create table if not exists _vt.resharding_journal.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("create table if not exists _vt.copy_state.*", &sqltypes.Result{}, nil)
}
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,9 @@ func shouldIgnoreQuery(query string) bool {
}

func expectDBClientQueries(t *testing.T, queries []string) {
extraQueries := withDDL.DDLs()
extraQueries = append(extraQueries, withDDLInitialQueries...)
// Either 'queries' or 'queriesWithDDLs' must match globalDBQueries
t.Helper()
failed := false
for i, query := range queries {
Expand All @@ -493,6 +496,11 @@ func expectDBClientQueries(t *testing.T, queries []string) {
if shouldIgnoreQuery(got) {
goto retry
}
for _, extraQuery := range extraQueries {
if got == extraQuery {
goto retry
}
}

var match bool
if query[0] == '/' {
Expand Down Expand Up @@ -531,6 +539,7 @@ func expectNontxQueries(t *testing.T, queries []string) {
t.Helper()
failed := false

skipQueries := withDDLInitialQueries
for i, query := range queries {
if failed {
t.Errorf("no query received, expecting %s", query)
Expand All @@ -544,6 +553,11 @@ func expectNontxQueries(t *testing.T, queries []string) {
shouldIgnoreQuery(got) {
goto retry
}
for _, skipQuery := range skipQueries {
if got == skipQuery {
goto retry
}
}

var match bool
if query[0] == '/' {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (vc *vcopier) fastForward(ctx context.Context, copyState map[string]*sqltyp
return err
}
if settings.StartPos.IsZero() {
update := binlogplayer.GenerateUpdatePos(vc.vr.id, pos, time.Now().Unix(), 0)
update := binlogplayer.GenerateUpdatePos(vc.vr.id, pos, time.Now().Unix(), 0, vc.vr.stats.CopyRowCount.Get())
_, err := vc.vr.dbClient.Execute(update)
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row

func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) {
vp.numAccumulatedHeartbeats = 0
update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts)
update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts, vp.vr.stats.CopyRowCount.Get())
if _, err := vp.vr.dbClient.Execute(update); err != nil {
return false, fmt.Errorf("error %v updating position", err)
}
Expand Down
3 changes: 0 additions & 3 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ type vstreamer struct {
vse *Engine
}

// CopyState contains the last PK for tables to be copied
type CopyState map[string][]*sqltypes.Result

// streamerPlan extends the original plan to also include
// the TableMap, which comes from the binlog. It's used
// to extract values from the ROW events.
Expand Down
5 changes: 5 additions & 0 deletions go/vt/withddl/withddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func New(ddls []string) *WithDDL {
}
}

// DDLs returns a copy of the ddls
func (wd *WithDDL) DDLs() []string {
return wd.ddls[:]
}

// Exec executes the query using the supplied function.
// If there are any schema errors, it applies the DDLs and retries.
// Funcs can be any of these types:
Expand Down