Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vrepl: vplayer must rollback on exit #5842

Merged
merged 1 commit into from
Feb 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/GeertJohan/go.rice v1.0.0
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/aws/aws-sdk-go v1.28.8
github.com/boltdb/bolt v1.3.1 // indirect
github.com/bombsimon/wsl v1.2.8 // indirect
github.com/cespare/xxhash/v2 v2.1.1
github.com/cockroachdb/cmux v0.0.0-20170110192607-30d10be49292 // indirect
Expand All @@ -20,14 +19,13 @@ require (
github.com/dchest/safefile v0.0.0-20151022103144-855e8d98f185 // indirect
github.com/evanphx/json-patch v4.5.0+incompatible
github.com/go-critic/go-critic v0.4.0 // indirect
github.com/go-ini/ini v1.12.0 // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/gogo/protobuf v1.3.1
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.3.2
github.com/golang/snappy v0.0.0-20170215233205-553a64147049
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/golangci/gocyclo v0.0.0-20180528144436-0a533e8fa43d // indirect
github.com/golangci/golangci-lint v1.21.0 // indirect
github.com/golangci/revgrep v0.0.0-20180812185044-276a5c0a1039 // indirect
Expand All @@ -37,12 +35,9 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/consul v1.5.1
github.com/hashicorp/consul/api v1.1.0
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-rootcerts v0.0.0-20160503143440-6bb64b370b90 // indirect
github.com/hashicorp/go-uuid v1.0.1 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/hashicorp/memberlist v0.1.4 // indirect
github.com/hashicorp/serf v0.0.0-20161207011743-d3a67ab21bc8 // indirect
github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428
github.com/klauspost/crc32 v1.2.0 // indirect
github.com/klauspost/pgzip v1.2.0
Expand All @@ -51,7 +46,6 @@ require (
github.com/mattn/go-isatty v0.0.11 // indirect
github.com/mattn/go-runewidth v0.0.1 // indirect
github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1
github.com/mitchellh/go-testing-interface v1.0.0 // indirect
github.com/olekukonko/tablewriter v0.0.0-20160115111002-cca8bbc07984
github.com/opentracing-contrib/go-grpc v0.0.0-20180928155321-4b5a12d3ff02
github.com/opentracing/opentracing-go v1.1.0
Expand Down Expand Up @@ -91,7 +85,6 @@ require (
google.golang.org/genproto v0.0.0-20190926190326-7ee9db18f195 // indirect
google.golang.org/grpc v1.24.0
gopkg.in/DataDog/dd-trace-go.v1 v1.17.0
gopkg.in/asn1-ber.v1 v1.0.0-20150924051756-4e86f4367175 // indirect
gopkg.in/ldap.v2 v2.5.0
honnef.co/go/tools v0.0.1-2019.2.3
mvdan.cc/unparam v0.0.0-20191111180625-960b1ec0f2c2 // indirect
Expand Down
156 changes: 156 additions & 0 deletions go.sum

Large diffs are not rendered by default.

21 changes: 0 additions & 21 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func TestPlayerCopyTables(t *testing.T) {
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"commit",
"rollback",
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=",
"begin",
Expand All @@ -93,14 +92,12 @@ func TestPlayerCopyTables(t *testing.T) {
"commit",
// copy of dst1 is done: delete from copy_state.
"/delete from _vt.copy_state.*dst1",
"rollback",
// The next FF executes and updates the position before copying.
"begin",
"/update _vt.vreplication set pos=",
"commit",
// Nothing to copy from yes. Delete from copy_state.
"/delete from _vt.copy_state.*yes",
"rollback",
// All tables copied. Final catch up followed by Running state.
"/update _vt.vreplication set state='Running'",
})
Expand Down Expand Up @@ -200,14 +197,12 @@ func TestPlayerCopyBigTable(t *testing.T) {
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"commit",
"rollback",
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=",
"begin",
"insert into dst(id,val) values (1,'aaa')",
`/update _vt.copy_state set lastpk='fields:<name:\\"id\\" type:INT32 > rows:<lengths:1 values:\\"1\\" > ' where vrepl_id=.*`,
"commit",
"rollback",
// The next catchup executes the new row insert, but will be a no-op.
"begin",
"insert into dst(id,val) select 3, 'ccc' from dual where (3) <= (1)",
Expand All @@ -228,8 +223,6 @@ func TestPlayerCopyBigTable(t *testing.T) {
`/update _vt.copy_state set lastpk='fields:<name:\\"id\\" type:INT32 > rows:<lengths:1 values:\\"3\\" > ' where vrepl_id=.*`,
"commit",
"/delete from _vt.copy_state.*dst",
// rollback is a no-op because the delete is autocommitted.
"rollback",
// Copy is done. Go into running state.
"/update _vt.vreplication set state='Running'",
// All tables copied. Final catch up followed by Running state.
Expand Down Expand Up @@ -331,14 +324,12 @@ func TestPlayerCopyWildcardRule(t *testing.T) {
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"commit",
"rollback",
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=",
"begin",
"insert into src(id,val) values (1,'aaa')",
`/update _vt.copy_state set lastpk='fields:<name:\\"id\\" type:INT32 > rows:<lengths:1 values:\\"1\\" > ' where vrepl_id=.*`,
"commit",
"rollback",
// The next catchup executes the new row insert, but will be a no-op.
"begin",
"insert into src(id,val) select 3, 'ccc' from dual where (3) <= (1)",
Expand All @@ -359,8 +350,6 @@ func TestPlayerCopyWildcardRule(t *testing.T) {
`/update _vt.copy_state set lastpk='fields:<name:\\"id\\" type:INT32 > rows:<lengths:1 values:\\"3\\" > ' where vrepl_id=.*`,
"commit",
"/delete from _vt.copy_state.*src",
// rollback is a no-op because the delete is autocommitted.
"rollback",
// Copy is done. Go into running state.
"/update _vt.vreplication set state='Running'",
// All tables copied. Final catch up followed by Running state.
Expand Down Expand Up @@ -505,12 +494,10 @@ func TestPlayerCopyTableContinuation(t *testing.T) {
"insert into dst1(id,val) values (7,'insert out'), (8,'no change'), (10,'updated'), (12,'move out')",
`/update _vt.copy_state set lastpk='fields:<name:\\"id1\\" type:INT32 > fields:<name:\\"id2\\" type:INT32 > rows:<lengths:2 lengths:1 values:\\"126\\" > ' where vrepl_id=.*`,
"/delete from _vt.copy_state.*dst1",
"rollback",
// Copy again. There should be no events for catchup.
"insert into not_copied(id,val) values (1,'bbb')",
`/update _vt.copy_state set lastpk='fields:<name:\\\"id\\\" type:INT32 > rows:<lengths:1 values:\\\"1\\\" > ' where vrepl_id=.*`,
"/delete from _vt.copy_state.*not_copied",
"rollback",
})
// Explicitly eat the Running state query. You can't make expectNontxQueries
// wait for it because it ignores _vt.vreplication events.
Expand Down Expand Up @@ -606,7 +593,6 @@ func TestPlayerCopyWildcardTableContinuation(t *testing.T) {
"insert into dst(id,val) values (3,'uncopied'), (4,'new')",
`/update _vt.copy_state set lastpk.*`,
"/delete from _vt.copy_state.*dst",
"rollback",
})
// Explicitly eat the Running state query. You can't make expectNontxQueries
// wait for it because it ignores _vt.vreplication events.
Expand Down Expand Up @@ -654,7 +640,6 @@ func TestPlayerCopyTablesNone(t *testing.T) {
"begin",
"/update _vt.vreplication set state='Stopped'",
"commit",
"rollback",
})
}

Expand Down Expand Up @@ -706,7 +691,6 @@ func TestPlayerCopyTablesStopAfterCopy(t *testing.T) {
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"commit",
"rollback",
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=",
"begin",
Expand All @@ -715,7 +699,6 @@ func TestPlayerCopyTablesStopAfterCopy(t *testing.T) {
"commit",
// copy of dst1 is done: delete from copy_state.
"/delete from _vt.copy_state.*dst1",
"rollback",
// All tables copied. Stop vreplication because we requested it.
"/update _vt.vreplication set state='Stopped'",
})
Expand Down Expand Up @@ -784,10 +767,7 @@ func TestPlayerCopyTableCancel(t *testing.T) {
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"commit",
"rollback",
// The first copy will do nothing because we set the timeout to be too low.
// We should expect it to do an empty rollback.
"rollback",
// The next copy should proceed as planned because we've made the timeout high again.
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=",
Expand All @@ -797,7 +777,6 @@ func TestPlayerCopyTableCancel(t *testing.T) {
"commit",
// copy of dst1 is done: delete from copy_state.
"/delete from _vt.copy_state.*dst1",
"rollback",
// All tables copied. Go into running state.
"/update _vt.vreplication set state='Running'",
})
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vdbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func (vc *vdbClient) Commit() error {
}

func (vc *vdbClient) Rollback() error {
if !vc.InTransaction {
return nil
}
if err := vc.DBClient.Rollback(); err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row
func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) {
update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts)
if _, err := vp.vr.dbClient.Execute(update); err != nil {
vp.vr.dbClient.Rollback()
return false, fmt.Errorf("error %v updating position", err)
}
vp.unsavedEvent = nil
Expand All @@ -223,6 +222,8 @@ func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) {
}

func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
defer vp.vr.dbClient.Rollback()

// If we're not running, set SecondsBehindMaster to be very high.
// TODO(sougou): if we also stored the time of the last event, we
// can estimate this value more accurately.
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestPlayerStatementModeWithFilter(t *testing.T) {
// It does not work when filter is enabled
output := []string{
"begin",
"rollback",
"/update _vt.vreplication set message='filter rules are not supported for SBR",
}

Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (vr *vreplicator) Replicate(ctx context.Context) error {
vr.tableKeys = tableKeys

for {
// This rollback is a no-op. It's here for safety
// in case the functions below leave transactions open.
vr.dbClient.Rollback()
deepthi marked this conversation as resolved.
Show resolved Hide resolved

settings, numTablesToCopy, err := vr.readSettings(ctx)
if err != nil {
return err
Expand Down