Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vreplication: table copying phase 1: create list of tables to copy #4736

Merged
merged 4 commits into from
Apr 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 37 additions & 18 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ var (
// BlplTransaction is the key for the stats map.
BlplTransaction = "Transaction"

// VReplicationInit is for the Init state.
VReplicationInit = "Init"
rafael marked this conversation as resolved.
Show resolved Hide resolved
// VReplicationCopying is for the Copying state.
VReplicationCopying = "Copying"
// BlpRunning is for the Running state.
BlpRunning = "Running"
// BlpStopped is for the Stopped state.
Expand Down Expand Up @@ -191,18 +195,18 @@ func (blp *BinlogPlayer) ApplyBinlogEvents(ctx context.Context) error {
// applyEvents returns a recordable status message on termination or an error otherwise.
func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
// Read starting values for vreplication.
pos, stopPos, maxTPS, maxReplicationLag, err := ReadVRSettings(blp.dbClient, blp.uid)
settings, err := ReadVRSettings(blp.dbClient, blp.uid)
if err != nil {
log.Error(err)
return err
}
blp.position, err = mysql.DecodePosition(pos)
blp.position, err = mysql.DecodePosition(settings.StartPos)
if err != nil {
log.Error(err)
return err
}
if stopPos != "" {
blp.stopPosition, err = mysql.DecodePosition(stopPos)
if settings.StopPos != "" {
blp.stopPosition, err = mysql.DecodePosition(settings.StopPos)
if err != nil {
log.Error(err)
return err
Expand All @@ -212,8 +216,8 @@ func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
fmt.Sprintf("BinlogPlayer/%d", blp.uid),
"transactions",
1, /* threadCount */
maxTPS,
maxReplicationLag,
settings.MaxTPS,
settings.MaxReplicationLag,
)
if err != nil {
err := fmt.Errorf("failed to instantiate throttler: %v", err)
Expand Down Expand Up @@ -512,29 +516,44 @@ func SetVReplicationState(dbClient DBClient, uid uint32, state, message string)
return nil
}

// VRSettings contains the settings of a vreplication table.
type VRSettings struct {
StartPos string
StopPos string
MaxTPS int64
MaxReplicationLag int64
State string
}

// ReadVRSettings retrieves the throttler settings for
// vreplication from the checkpoint table.
func ReadVRSettings(dbClient DBClient, uid uint32) (pos, stopPos string, maxTPS, maxReplicationLag int64, err error) {
query := fmt.Sprintf("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=%v", uid)
func ReadVRSettings(dbClient DBClient, uid uint32) (VRSettings, error) {
query := fmt.Sprintf("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=%v", uid)
qr, err := dbClient.ExecuteFetch(query, 1)
if err != nil {
return "", "", throttler.InvalidMaxRate, throttler.InvalidMaxReplicationLag, fmt.Errorf("error %v in selecting vreplication settings %v", err, query)
return VRSettings{}, fmt.Errorf("error %v in selecting vreplication settings %v", err, query)
}

if qr.RowsAffected != 1 {
return "", "", throttler.InvalidMaxRate, throttler.InvalidMaxReplicationLag, fmt.Errorf("checkpoint information not available in db for %v", uid)
return VRSettings{}, fmt.Errorf("checkpoint information not available in db for %v", uid)
}

maxTPS, err = sqltypes.ToInt64(qr.Rows[0][2])
maxTPS, err := sqltypes.ToInt64(qr.Rows[0][2])
if err != nil {
return "", "", throttler.InvalidMaxRate, throttler.InvalidMaxReplicationLag, fmt.Errorf("failed to parse max_tps column: %v", err)
return VRSettings{}, fmt.Errorf("failed to parse max_tps column: %v", err)
}
maxReplicationLag, err = sqltypes.ToInt64(qr.Rows[0][3])
maxReplicationLag, err := sqltypes.ToInt64(qr.Rows[0][3])
if err != nil {
return "", "", throttler.InvalidMaxRate, throttler.InvalidMaxReplicationLag, fmt.Errorf("failed to parse max_replication_lag column: %v", err)
return VRSettings{}, fmt.Errorf("failed to parse max_replication_lag column: %v", err)
}

return qr.Rows[0][0].ToString(), qr.Rows[0][1].ToString(), maxTPS, maxReplicationLag, nil
return VRSettings{
StartPos: qr.Rows[0][0].ToString(),
StopPos: qr.Rows[0][1].ToString(),
MaxTPS: maxTPS,
MaxReplicationLag: maxReplicationLag,
State: qr.Rows[0][4].ToString(),
}, nil
}

// CreateVReplication returns a statement to populate the first value into
Expand All @@ -546,12 +565,12 @@ func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, posi
encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag, timeUpdated, BlpRunning)
}

// CreateVReplicationStopped returns a statement to create a stopped vreplication.
func CreateVReplicationStopped(workflow string, source *binlogdatapb.BinlogSource, position string) string {
// CreateVReplicationState returns a statement to create a stopped vreplication.
func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, position, state string) string {
rafael marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v')",
encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), BlpStopped)
encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), state)
}

// GenerateUpdatePos returns a statement to update a value in the
Expand Down
21 changes: 13 additions & 8 deletions go/vt/binlog/binlogplayer/binlog_player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
sqltypes.NULL, // stop_pos
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
},
},
}
Expand All @@ -51,7 +52,7 @@ var (
func TestNewBinlogPlayerKeyRange(t *testing.T) {
dbClient := NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -82,7 +83,7 @@ func TestNewBinlogPlayerKeyRange(t *testing.T) {
func TestNewBinlogPlayerTables(t *testing.T) {
dbClient := NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -114,7 +115,7 @@ func TestNewBinlogPlayerTables(t *testing.T) {
func TestApplyEventsFail(t *testing.T) {
dbClient := NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, errors.New("err"))
dbClient.ExpectRequest("update _vt.vreplication set state='Error', message='error in processing binlog event failed query BEGIN, err: err' where id=1", testDMLResponse, nil)

Expand Down Expand Up @@ -145,10 +146,11 @@ func TestStopPosEqual(t *testing.T) {
sqltypes.NewVarBinary("MariaDB/0-1-1083"), // stop_pos
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
},
},
}
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest(`update _vt.vreplication set state='Stopped', message='not starting BinlogPlayer, we\'re already at the desired position 0-1-1083' where id=1`, testDMLResponse, nil)

_ = newFakeBinlogClient()
Expand Down Expand Up @@ -177,10 +179,11 @@ func TestStopPosLess(t *testing.T) {
sqltypes.NewVarBinary("MariaDB/0-1-1082"), // stop_pos
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
},
},
}
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest(`update _vt.vreplication set state='Stopped', message='starting point 0-1-1083 greater than stopping point 0-1-1082' where id=1`, testDMLResponse, nil)

_ = newFakeBinlogClient()
Expand Down Expand Up @@ -209,10 +212,11 @@ func TestStopPosGreater(t *testing.T) {
sqltypes.NewVarBinary("MariaDB/0-1-1085"), // stop_pos
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
},
},
}
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -245,10 +249,11 @@ func TestContextCancel(t *testing.T) {
sqltypes.NewVarBinary("MariaDB/0-1-1085"), // stop_pos
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
},
},
}
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand All @@ -275,7 +280,7 @@ func TestContextCancel(t *testing.T) {
func TestRetryOnDeadlock(t *testing.T) {
dbClient := NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
deadlocked := &mysql.SQLError{Num: 1213, Message: "deadlocked"}
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", nil, deadlocked)
Expand Down
16 changes: 9 additions & 7 deletions go/vt/vttablet/tabletmanager/vreplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
sqltypes.NULL, // stop_pos
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
},
},
}
Expand All @@ -64,7 +65,7 @@ func TestControllerKeyRange(t *testing.T) {

dbClient := binlogplayer.NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestControllerTables(t *testing.T) {

dbClient := binlogplayer.NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -191,7 +192,7 @@ func TestControllerOverrides(t *testing.T) {

dbClient := binlogplayer.NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -255,10 +256,10 @@ func TestControllerRetry(t *testing.T) {

dbClient := binlogplayer.NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", nil, errors.New("(expected error)"))
dbClient.ExpectRequest("update _vt.vreplication set state='Error', message='error (expected error) in selecting vreplication settings select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", nil, errors.New("(expected error)"))
dbClient.ExpectRequest("update _vt.vreplication set state='Error', message='error (expected error) in selecting vreplication settings select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -298,10 +299,11 @@ func TestControllerStopPosition(t *testing.T) {
sqltypes.NewVarBinary("MariaDB/0-1-1235"), // stop_pos
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
},
},
}
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", withStop, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", withStop, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestEngineOpen(t *testing.T) {
fmt.Sprintf(`1|Running|keyspace:"%s" shard:"0" key_range:<end:"\200" > `, env.KeyspaceName),
), nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestEngineExec(t *testing.T) {
fmt.Sprintf(`1|Running|keyspace:"%s" shard:"0" key_range:<end:"\200" > `, env.KeyspaceName),
), nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestEngineExec(t *testing.T) {
fmt.Sprintf(`1|Running|keyspace:"%s" shard:"0" key_range:<end:"\200" > `, env.KeyspaceName),
), nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -439,7 +439,7 @@ func TestCreateDBAndTable(t *testing.T) {
fmt.Sprintf(`1|Running|keyspace:"%s" shard:"0" key_range:<end:"\200" > `, env.KeyspaceName),
), nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down
Loading