From 50c8e92e52d32ceb1ec8fb2be2e3f88377dd8287 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 24 Oct 2023 00:38:12 +0200 Subject: [PATCH 1/4] Wait for each table diff's shard streamers to cleanup before starting the next table or exiting. Also limit the size of the error being written to the vdiff record since we have noticed some errors, which contain gtids can be arbitrarily large. Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletmanager/vdiff/controller.go | 13 ++++++++++--- .../vttablet/tabletmanager/vdiff/table_differ.go | 16 ++++++++++++++-- .../tabletmanager/vdiff/workflow_differ.go | 6 ++++++ 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index de93895a4eb..1ba6e264b8c 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -18,7 +18,6 @@ package vdiff import ( "context" - "errors" "fmt" "strings" "time" @@ -162,13 +161,21 @@ func (ct *controller) updateState(dbClient binlogplayer.DBClient, state VDiffSta extraCols = ", completed_at = utc_timestamp()" default: } + var errorString string if err == nil { // Clear out any previous error for the vdiff on this shard - err = errors.New("") + errorString = "" + } else { + // limit the error string to be within column length of `last_error` + const MaxErrorLength = 500 + errorString = err.Error() + if len(errorString) > MaxErrorLength { + errorString = errorString[:MaxErrorLength] + } } query := sqlparser.BuildParsedQuery(sqlUpdateVDiffState, encodeString(string(state)), - encodeString(err.Error()), + encodeString(errorString), extraCols, ct.id, ) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index e65a0bad253..dd1b5f07e7d 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -72,6 +72,12 @@ type tableDiffer struct { sourceQuery string table *tabletmanagerdatapb.TableDefinition lastPK *querypb.QueryResult + + // wgShardStreamers is used, with a cancellable context, to wait for all shard streamers + // to finish after each diff is complete. + wgShardStreamers sync.WaitGroup + shardStreamsCtx context.Context + shardStreamsCancel context.CancelFunc } func newTableDiffer(wd *workflowDiffer, table *tabletmanagerdatapb.TableDefinition, sourceQuery string) *tableDiffer { @@ -121,19 +127,23 @@ func (td *tableDiffer) initialize(ctx context.Context) error { } }() + shardStreamsCtx, shardStreamsCancel := context.WithCancel(ctx) + td.shardStreamsCtx = shardStreamsCtx + td.shardStreamsCancel = shardStreamsCancel + if err := td.selectTablets(ctx); err != nil { return err } if err := td.syncSourceStreams(ctx); err != nil { return err } - if err := td.startSourceDataStreams(ctx); err != nil { + if err := td.startSourceDataStreams(shardStreamsCtx); err != nil { return err } if err := td.syncTargetStreams(ctx); err != nil { return err } - if err := td.startTargetDataStream(ctx); err != nil { + if err := td.startTargetDataStream(shardStreamsCtx); err != nil { return err } td.setupRowSorters() @@ -353,11 +363,13 @@ func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) err } func (td *tableDiffer) streamOneShard(ctx context.Context, participant *shardStreamer, query string, lastPK *querypb.QueryResult, gtidch chan string) { + td.wgShardStreamers.Add(1) log.Infof("streamOneShard Start on %s using query: %s", participant.tablet.Alias.String(), query) defer func() { log.Infof("streamOneShard End on %s", participant.tablet.Alias.String()) close(participant.result) close(gtidch) + td.wgShardStreamers.Done() }() participant.err = func() error { conn, err := tabletconn.GetDialer()(participant.tablet, false) diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index e27d421d398..10cfcdda17f 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -134,6 +134,11 @@ func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompa } func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.DBClient, td *tableDiffer) error { + defer func() { + // Wait for all the shard streams to finish before returning. + td.wgShardStreamers.Wait() + }() + select { case <-ctx.Done(): return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") @@ -155,6 +160,7 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D log.Errorf("Encountered an error diffing table %s for vdiff %s: %v", td.table.Name, wd.ct.uuid, err) return err } + td.shardStreamsCancel() log.Infof("Table diff done on table %s for vdiff %s with report: %+v", td.table.Name, wd.ct.uuid, dr) if dr.ExtraRowsSource > 0 || dr.ExtraRowsTarget > 0 { if err := wd.reconcileExtraRows(dr, wd.opts.CoreOptions.MaxExtraRowsToCompare); err != nil { From a00d764048ec94615f98d9c0b2c94e5a0ebf4772 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 24 Oct 2023 12:40:06 -0400 Subject: [PATCH 2/4] Address my own nits. Signed-off-by: Matt Lord --- go/vt/sidecardb/schema/vdiff/vdiff.sql | 2 +- .../tabletmanager/vdiff/controller.go | 13 +++-------- go/vt/vttablet/tabletmanager/vdiff/schema.go | 2 +- .../tabletmanager/vdiff/table_differ.go | 23 +++++++++---------- .../tabletmanager/vdiff/workflow_differ.go | 3 ++- 5 files changed, 18 insertions(+), 25 deletions(-) diff --git a/go/vt/sidecardb/schema/vdiff/vdiff.sql b/go/vt/sidecardb/schema/vdiff/vdiff.sql index 5eae9270460..52392bde427 100644 --- a/go/vt/sidecardb/schema/vdiff/vdiff.sql +++ b/go/vt/sidecardb/schema/vdiff/vdiff.sql @@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS vdiff `started_at` timestamp NULL DEFAULT NULL, `liveness_timestamp` timestamp NULL DEFAULT NULL, `completed_at` timestamp NULL DEFAULT NULL, - `last_error` varbinary(512) DEFAULT NULL, + `last_error` varbinary(1024) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `uuid_idx` (`vdiff_uuid`), KEY `state` (`state`), diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index 1ba6e264b8c..de93895a4eb 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -18,6 +18,7 @@ package vdiff import ( "context" + "errors" "fmt" "strings" "time" @@ -161,21 +162,13 @@ func (ct *controller) updateState(dbClient binlogplayer.DBClient, state VDiffSta extraCols = ", completed_at = utc_timestamp()" default: } - var errorString string if err == nil { // Clear out any previous error for the vdiff on this shard - errorString = "" - } else { - // limit the error string to be within column length of `last_error` - const MaxErrorLength = 500 - errorString = err.Error() - if len(errorString) > MaxErrorLength { - errorString = errorString[:MaxErrorLength] - } + err = errors.New("") } query := sqlparser.BuildParsedQuery(sqlUpdateVDiffState, encodeString(string(state)), - encodeString(errorString), + encodeString(err.Error()), extraCols, ct.id, ) diff --git a/go/vt/vttablet/tabletmanager/vdiff/schema.go b/go/vt/vttablet/tabletmanager/vdiff/schema.go index f9f48cc72e9..4a00b2194ba 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/schema.go +++ b/go/vt/vttablet/tabletmanager/vdiff/schema.go @@ -39,7 +39,7 @@ const ( from _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) where vd.id = %a` // sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1` - sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d" + sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = left(%s, 1024) %s where id = %d" sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = '' where vd.id = vdt.vdiff_id and vd.id = %a and vd.state != 'completed'` sqlGetVReplicationEntry = "select * from _vt.vreplication %s" diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index dd1b5f07e7d..de035d0df70 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -34,11 +34,6 @@ import ( "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" - tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - "vitess.io/vitess/go/vt/proto/topodata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -46,6 +41,12 @@ import ( "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vttablet/tabletconn" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // how long to wait for background operations to complete @@ -127,9 +128,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error { } }() - shardStreamsCtx, shardStreamsCancel := context.WithCancel(ctx) - td.shardStreamsCtx = shardStreamsCtx - td.shardStreamsCancel = shardStreamsCancel + td.shardStreamsCtx, td.shardStreamsCancel = context.WithCancel(ctx) if err := td.selectTablets(ctx); err != nil { return err @@ -137,13 +136,13 @@ func (td *tableDiffer) initialize(ctx context.Context) error { if err := td.syncSourceStreams(ctx); err != nil { return err } - if err := td.startSourceDataStreams(shardStreamsCtx); err != nil { + if err := td.startSourceDataStreams(td.shardStreamsCtx); err != nil { return err } if err := td.syncTargetStreams(ctx); err != nil { return err } - if err := td.startTargetDataStream(shardStreamsCtx); err != nil { + if err := td.startTargetDataStream(td.shardStreamsCtx); err != nil { return err } td.setupRowSorters() @@ -213,7 +212,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { var ( wg sync.WaitGroup sourceErr, targetErr error - targetTablet *topodata.Tablet + targetTablet *topodatapb.Tablet ) // The cells from the vdiff record are a comma separated list. @@ -264,7 +263,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { return targetErr } -func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { +func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodatapb.Tablet, error) { tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}) if err != nil { return nil, err diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index 10cfcdda17f..adaf87d6ee4 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -135,6 +135,8 @@ func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompa func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.DBClient, td *tableDiffer) error { defer func() { + td.shardStreamsCancel() + // Wait for all the shard streams to finish before returning. td.wgShardStreamers.Wait() }() @@ -160,7 +162,6 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D log.Errorf("Encountered an error diffing table %s for vdiff %s: %v", td.table.Name, wd.ct.uuid, err) return err } - td.shardStreamsCancel() log.Infof("Table diff done on table %s for vdiff %s with report: %+v", td.table.Name, wd.ct.uuid, dr) if dr.ExtraRowsSource > 0 || dr.ExtraRowsTarget > 0 { if err := wd.reconcileExtraRows(dr, wd.opts.CoreOptions.MaxExtraRowsToCompare); err != nil { From 076ab678ba5b84ed6d7c06593ff21a560a090f15 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 24 Oct 2023 17:28:40 -0400 Subject: [PATCH 3/4] Only call cancel if it's non-nil Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vdiff/table_differ.go | 2 +- go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index de035d0df70..d3761436285 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -362,8 +362,8 @@ func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) err } func (td *tableDiffer) streamOneShard(ctx context.Context, participant *shardStreamer, query string, lastPK *querypb.QueryResult, gtidch chan string) { - td.wgShardStreamers.Add(1) log.Infof("streamOneShard Start on %s using query: %s", participant.tablet.Alias.String(), query) + td.wgShardStreamers.Add(1) defer func() { log.Infof("streamOneShard End on %s", participant.tablet.Alias.String()) close(participant.result) diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index adaf87d6ee4..d7d2583a5d3 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -135,8 +135,9 @@ func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompa func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.DBClient, td *tableDiffer) error { defer func() { - td.shardStreamsCancel() - + if td.shardStreamsCancel != nil { + td.shardStreamsCancel() + } // Wait for all the shard streams to finish before returning. td.wgShardStreamers.Wait() }() From 04f8d606043a047d820b21d6afc59050e70a6ead Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 24 Oct 2023 17:34:18 -0400 Subject: [PATCH 4/4] Adjust unit tests Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vdiff/engine_test.go | 8 ++++---- go/vt/vttablet/tabletmanager/vdiff/framework_test.go | 2 +- go/vt/vttablet/tabletmanager/vdiff/schema.go | 3 ++- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine_test.go b/go/vt/vttablet/tabletmanager/vdiff/engine_test.go index cf77502fb32..75b0e37d630 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine_test.go @@ -85,7 +85,7 @@ func TestEngineOpen(t *testing.T) { ), nil) // Now let's short circuit the vdiff as we know that the open has worked as expected. - shortCircuitTestAfterQuery("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = 1", vdiffenv.dbClient) + shortCircuitTestAfterQuery("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = 1", vdiffenv.dbClient) vdenv.vde.Open(context.Background(), vdiffenv.vre) defer vdenv.vde.Close() @@ -132,7 +132,7 @@ func TestVDiff(t *testing.T) { ), fmt.Sprintf("1|%s|%s|%s||9223372036854775807|9223372036854775807||PRIMARY,REPLICA|1669511347|0|Running||%s|200||1669511347|1|0||1", vdiffenv.workflow, vreplSource, vdiffSourceGtid, vdiffDBName), ), nil) - vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = 1", singleRowAffected, nil) + vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = 1", singleRowAffected, nil) vdenv.dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: started')", singleRowAffected, nil) vdenv.dbClient.ExpectRequest(`select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) @@ -194,7 +194,7 @@ func TestVDiff(t *testing.T) { vdenv.dbClient.ExpectRequest("update _vt.vdiff_table set state = 'completed' where vdiff_id = 1 and table_name = 't1'", singleRowAffected, nil) vdenv.dbClient.ExpectRequest(`insert into _vt.vdiff_log(vdiff_id, message) values (1, 'completed: table \'t1\'')`, singleRowAffected, nil) vdenv.dbClient.ExpectRequest("select table_name as table_name from _vt.vdiff_table where vdiff_id = 1 and state != 'completed'", singleRowAffected, nil) - vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'completed', last_error = '' , completed_at = utc_timestamp() where id = 1", singleRowAffected, nil) + vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'completed', last_error = left('', 1024) , completed_at = utc_timestamp() where id = 1", singleRowAffected, nil) vdenv.dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: completed')", singleRowAffected, nil) vdenv.vde.mu.Lock() @@ -271,7 +271,7 @@ func TestEngineRetryErroredVDiffs(t *testing.T) { ), nil) // At this point we know that we kicked off the expected retry so we can short circit the vdiff. - shortCircuitTestAfterQuery(fmt.Sprintf("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = %s", id), vdiffenv.dbClient) + shortCircuitTestAfterQuery(fmt.Sprintf("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = %s", id), vdiffenv.dbClient) expectedControllerCnt++ } diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index 9f69e9ed86d..d5e8c134814 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -197,7 +197,7 @@ func resetBinlogClient() { // has verified the necessary behavior. func shortCircuitTestAfterQuery(query string, dbClient *binlogplayer.MockDBClient) { dbClient.ExpectRequest(query, singleRowAffected, fmt.Errorf("Short circuiting test")) - dbClient.ExpectRequest("update _vt.vdiff set state = 'error', last_error = 'Short circuiting test' where id = 1", singleRowAffected, nil) + dbClient.ExpectRequest("update _vt.vdiff set state = 'error', last_error = left('Short circuiting test', 1024) where id = 1", singleRowAffected, nil) dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: error')", singleRowAffected, nil) dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'Error: Short circuiting test')", singleRowAffected, nil) } diff --git a/go/vt/vttablet/tabletmanager/vdiff/schema.go b/go/vt/vttablet/tabletmanager/vdiff/schema.go index 4a00b2194ba..72da9f15ada 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/schema.go +++ b/go/vt/vttablet/tabletmanager/vdiff/schema.go @@ -38,7 +38,8 @@ const ( IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report from _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) where vd.id = %a` - // sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1` + // sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1`. + // It also truncates the error if needed to ensure that we can save the state when the error text is very long. sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = left(%s, 1024) %s where id = %d" sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = '' where vd.id = vdt.vdiff_id and vd.id = %a and vd.state != 'completed'`