Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vreplication: code improvement #4946

Merged
merged 3 commits into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 18 additions & 30 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ func (tp *TablePlan) MarshalJSON() ([]byte, error) {
return json.Marshal(&v)
}

func (tp *TablePlan) generateBulkInsert(rows *binlogdatapb.VStreamRowsResponse) (string, error) {
func (tp *TablePlan) applyBulkInsert(rows *binlogdatapb.VStreamRowsResponse, executor func(string) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
bindvars := make(map[string]*querypb.BindVariable, len(tp.Fields))
var buf strings.Builder
if err := tp.BulkInsertFront.Append(&buf, nil, nil); err != nil {
return "", err
return nil, err
}
buf.WriteString(" values ")
separator := ""
Expand All @@ -179,10 +179,10 @@ func (tp *TablePlan) generateBulkInsert(rows *binlogdatapb.VStreamRowsResponse)
if tp.BulkInsertOnDup != nil {
tp.BulkInsertOnDup.Append(&buf, nil, nil)
}
return buf.String(), nil
return executor(buf.String())
}

func (tp *TablePlan) generateStatements(rowChange *binlogdatapb.RowChange) ([]string, error) {
func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor func(string) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
// MakeRowTrusted is needed here because Proto3ToResult is not convenient.
var before, after bool
bindvars := make(map[string]*querypb.BindVariable, len(tp.Fields))
Expand All @@ -202,47 +202,35 @@ func (tp *TablePlan) generateStatements(rowChange *binlogdatapb.RowChange) ([]st
}
switch {
case !before && after:
query, err := tp.Insert.GenerateQuery(bindvars, nil)
if err != nil {
return nil, err
}
return []string{query}, nil
return execParsedQuery(tp.Insert, bindvars, executor)
case before && !after:
if tp.Delete == nil {
return nil, nil
}
query, err := tp.Delete.GenerateQuery(bindvars, nil)
if err != nil {
return nil, err
}
return []string{query}, nil
return execParsedQuery(tp.Delete, bindvars, executor)
case before && after:
if !tp.pkChanged(bindvars) {
query, err := tp.Update.GenerateQuery(bindvars, nil)
if err != nil {
return nil, err
}
return []string{query}, nil
return execParsedQuery(tp.Update, bindvars, executor)
}

queries := make([]string, 0, 2)
if tp.Delete != nil {
query, err := tp.Delete.GenerateQuery(bindvars, nil)
if err != nil {
if _, err := execParsedQuery(tp.Delete, bindvars, executor); err != nil {
return nil, err
}
queries = append(queries, query)
}
query, err := tp.Insert.GenerateQuery(bindvars, nil)
if err != nil {
return nil, err
}
queries = append(queries, query)
return queries, nil
return execParsedQuery(tp.Insert, bindvars, executor)
}
// Unreachable.
return nil, nil
}

func execParsedQuery(pq *sqlparser.ParsedQuery, bindvars map[string]*querypb.BindVariable, executor func(string) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
sql, err := pq.GenerateQuery(bindvars, nil)
if err != nil {
return nil, err
}
return executor(sql)
}

func (tp *TablePlan) pkChanged(bindvars map[string]*querypb.BindVariable) bool {
for _, pkref := range tp.PKReferences {
v1, _ := sqltypes.BindVariableToValue(bindvars["b_"+pkref])
Expand Down
26 changes: 14 additions & 12 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error {
fmt.Fprintf(&buf, "%s(%d, %s)", prefix, vc.vr.id, encodeString(name))
prefix = ", "
}
if _, err := vc.vr.dbClient.ExecuteFetch(buf.String(), 1); err != nil {
if _, err := vc.vr.dbClient.Execute(buf.String()); err != nil {
return err
}
if err := vc.vr.setState(binlogplayer.VReplicationCopying, ""); err != nil {
Expand All @@ -84,7 +84,7 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error {
}

func (vc *vcopier) copyNext(ctx context.Context, settings binlogplayer.VRSettings) error {
qr, err := vc.vr.dbClient.ExecuteFetch(fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id=%d", vc.vr.id), 10000)
qr, err := vc.vr.dbClient.Execute(fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id=%d", vc.vr.id))
if err != nil {
return err
}
Expand Down Expand Up @@ -232,10 +232,17 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
// to data size, this should map to a uniform amount of pages affected
// per statement. A packet size of 30K will roughly translate to 8
// mysql pages of 4K each.
query, err := vc.tablePlan.generateBulkInsert(rows)
if err := vc.vr.dbClient.Begin(); err != nil {
return err
}

_, err = vc.tablePlan.applyBulkInsert(rows, func(sql string) (*sqltypes.Result, error) {
return vc.vr.dbClient.ExecuteWithRetry(ctx, sql)
})
if err != nil {
return err
}

var buf bytes.Buffer
err = proto.CompactText(&buf, &querypb.QueryResult{
Fields: pkfields,
Expand All @@ -254,15 +261,10 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
if err != nil {
return err
}
if err := vc.vr.dbClient.Begin(); err != nil {
return err
}
if _, err := vc.vr.dbClient.ExecuteFetch(query, 0); err != nil {
return err
}
if _, err := vc.vr.dbClient.ExecuteFetch(updateState, 0); err != nil {
if _, err := vc.vr.dbClient.Execute(updateState); err != nil {
return err
}

if err := vc.vr.dbClient.Commit(); err != nil {
return err
}
Expand All @@ -279,7 +281,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
}
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("delete from _vt.copy_state where vrepl_id=%s and table_name=%s", strconv.Itoa(int(vc.vr.id)), encodeString(tableName))
if _, err := vc.vr.dbClient.ExecuteFetch(buf.String(), 0); err != nil {
if _, err := vc.vr.dbClient.Execute(buf.String()); err != nil {
return err
}
return nil
Expand All @@ -296,7 +298,7 @@ func (vc *vcopier) fastForward(ctx context.Context, copyState map[string]*sqltyp
}
if settings.StartPos.IsZero() {
update := binlogplayer.GenerateUpdatePos(vc.vr.id, pos, time.Now().Unix(), 0)
_, err := vc.vr.dbClient.ExecuteFetch(update, 0)
_, err := vc.vr.dbClient.Execute(update)
return err
}
return newVPlayer(vc.vr, settings, copyState, pos).play(ctx)
Expand Down
45 changes: 40 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/vdbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ limitations under the License.
package vreplication

import (
"io"
"time"

"golang.org/x/net/context"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
)

// vdbClient is a wrapper on binlogplayer.DBClient.
Expand Down Expand Up @@ -83,17 +87,48 @@ func (vc *vdbClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result,
return vc.DBClient.ExecuteFetch(query, maxrows)
}

func (vc *vdbClient) Retry() error {
// Execute is ExecuteFetch without the maxrows.
func (vc *vdbClient) Execute(query string) (*sqltypes.Result, error) {
return vc.ExecuteFetch(query, 100000)
}

func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqltypes.Result, error) {
qr, err := vc.Execute(query)
for err != nil {
if sqlErr, ok := err.(*mysql.SQLError); ok && sqlErr.Number() == mysql.ERLockDeadlock || sqlErr.Number() == mysql.ERLockWaitTimeout {
log.Infof("retryable error: %v, waiting for %v and retrying", sqlErr, dbLockRetryDelay)
if err := vc.Rollback(); err != nil {
return nil, err
}
time.Sleep(dbLockRetryDelay)
// Check context here. Otherwise this can become an infinite loop.
select {
case <-ctx.Done():
return nil, io.EOF
default:
}
qr, err = vc.Retry()
continue
}
return qr, err
}
return qr, nil
}

func (vc *vdbClient) Retry() (*sqltypes.Result, error) {
var qr *sqltypes.Result
for _, q := range vc.queries {
if q == "begin" {
if err := vc.Begin(); err != nil {
return err
return nil, err
}
continue
}
if _, err := vc.DBClient.ExecuteFetch(q, 10000); err != nil {
return err
result, err := vc.DBClient.ExecuteFetch(q, 100000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is using the default maxRows, can you replace it with a call to Execute?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was set to 100K rows based on the max packet size of 30K. There's actually no need to set a limit on number of rows because it can't exceed the max packet size. However, since max packet size is configurable, I think I should set it to max packet size itself (assuming worst case of 1 byte per row).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is being passed down to DBClient. That sounds like a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for maxrows in the original ExecuteFetch, the signature cannot be changed until old binlog player is deprecated.

if err != nil {
return nil, err
}
qr = result
}
return nil
return qr, nil
}
39 changes: 6 additions & 33 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,22 +176,19 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row
return fmt.Errorf("unexpected event on table %s", rowEvent.TableName)
}
for _, change := range rowEvent.RowChanges {
queries, err := tplan.generateStatements(change)
_, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) {
return vp.vr.dbClient.ExecuteWithRetry(ctx, sql)
})
if err != nil {
return err
}
for _, query := range queries {
if err := vp.exec(ctx, query); err != nil {
return err
}
}
}
return nil
}

func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) {
update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts)
if _, err := vp.vr.dbClient.ExecuteFetch(update, 0); err != nil {
if _, err := vp.vr.dbClient.Execute(update); err != nil {
vp.vr.dbClient.Rollback()
return false, fmt.Errorf("error %v updating position", err)
}
Expand All @@ -209,30 +206,6 @@ func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) {
return posReached, nil
}

func (vp *vplayer) exec(ctx context.Context, sql string) error {
vp.vr.stats.Timings.Record("query", time.Now())
_, err := vp.vr.dbClient.ExecuteFetch(sql, 0)
for err != nil {
if sqlErr, ok := err.(*mysql.SQLError); ok && sqlErr.Number() == mysql.ERLockDeadlock || sqlErr.Number() == mysql.ERLockWaitTimeout {
log.Infof("retryable error: %v, waiting for %v and retrying", sqlErr, dbLockRetryDelay)
if err := vp.vr.dbClient.Rollback(); err != nil {
return err
}
time.Sleep(dbLockRetryDelay)
// Check context here. Otherwise this can become an infinite loop.
select {
case <-ctx.Done():
return io.EOF
default:
}
err = vp.vr.dbClient.Retry()
continue
}
return err
}
return nil
}

func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
for {
items, err := relay.Fetch()
Expand Down Expand Up @@ -403,7 +376,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
}
return io.EOF
case binlogdatapb.OnDDLAction_EXEC:
if err := vp.exec(ctx, event.Ddl); err != nil {
if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Ddl); err != nil {
return err
}
posReached, err := vp.updatePos(event.Timestamp)
Expand All @@ -414,7 +387,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
return io.EOF
}
case binlogdatapb.OnDDLAction_EXEC_IGNORE:
if err := vp.exec(ctx, event.Ddl); err != nil {
if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Ddl); err != nil {
log.Infof("Ignoring error: %v for DDL: %s", err, event.Ddl)
}
posReached, err := vp.updatePos(event.Timestamp)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (vr *vreplicator) readSettings(ctx context.Context) (settings binlogplayer.
}

query := fmt.Sprintf("select count(*) from _vt.copy_state where vrepl_id=%d", vr.id)
qr, err := vr.dbClient.ExecuteFetch(query, 10)
qr, err := vr.dbClient.Execute(query)
if err != nil {
// If it's a not found error, create it.
merr, isSQLErr := err.(*mysql.SQLError)
Expand All @@ -143,13 +143,13 @@ func (vr *vreplicator) readSettings(ctx context.Context) (settings binlogplayer.
}
log.Info("Looks like _vt.copy_state table may not exist. Trying to create... ")
for _, query := range CreateCopyState {
if _, merr := vr.dbClient.ExecuteFetch(query, 0); merr != nil {
if _, merr := vr.dbClient.Execute(query); merr != nil {
log.Errorf("Failed to ensure _vt.copy_state table exists: %v", merr)
return settings, numTablesToCopy, err
}
}
// Redo the read.
qr, err = vr.dbClient.ExecuteFetch(query, 10)
qr, err = vr.dbClient.Execute(query)
if err != nil {
return settings, numTablesToCopy, err
}
Expand All @@ -170,7 +170,7 @@ func (vr *vreplicator) setMessage(message string) error {
Message: message,
})
query := fmt.Sprintf("update _vt.vreplication set message=%v where id=%v", encodeString(message), vr.id)
if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil {
if _, err := vr.dbClient.Execute(query); err != nil {
return fmt.Errorf("could not set message: %v: %v", query, err)
}
return nil
Expand Down