Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Guard against unsafe _vt.vreplication writes #14797

Merged
merged 21 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func TestReparentWithDownReplica(t *testing.T) {
// insert data into the new primary, check the connected replica work
insertVal = utils.ConfirmReplication(t, tablets[1], []*cluster.Vttablet{tablets[0], tablets[3]})
} else {
assert.Contains(t, out, fmt.Sprintf("TabletManager.PrimaryStatus on %s error", tablets[2].Alias))
assert.Contains(t, out, fmt.Sprintf("TabletManager.PrimaryStatus on %s", tablets[2].Alias))
// insert data into the old primary, check the connected replica works. The primary tablet shouldn't have changed.
insertVal = utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[3]})
}
Expand Down
15 changes: 13 additions & 2 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,19 @@ func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) {
return
}
dir := vc.ClusterConfig.vtdataroot
log.Infof("Deleting vtdataroot %s", dir)
err := os.RemoveAll(dir)
// The directory cleanup sometimes fails with a "directory not empty" error as
// everything in the test is shutting down and cleaning up. So we retry a few
// times to deal with that non-problematic and ephemeral issue.
var err error
retries := 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is an issue in CI is just a 3 second retry enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only been seen in the CI AFAIK, and so far 3 retries with a 1 second delay seems to have been enough (we've done essentially the same thing in several places now).

for i := 1; i <= retries; i++ {
if err = os.RemoveAll(dir); err == nil {
log.Infof("Deleted vtdataroot %q", dir)
break
}
log.Errorf("Failed to delete vtdataroot (attempt %d of %d) %q: %v", i, retries, dir, err)
time.Sleep(1 * time.Second)
}
mattlord marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
if recreate {
err = os.Mkdir(dir, 0700)
Expand Down
14 changes: 13 additions & 1 deletion go/vt/vtctl/workflow/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ func (s *Server) buildResharder(ctx context.Context, keyspace, workflow string,
return rs, nil
}

// validateTargets ensures that the target shards have no existing
// VReplication workflow streams as that is an invalid starting
// state for the non-serving shards involved in a Reshard.
func (rs *resharder) validateTargets(ctx context.Context) error {
err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error {
targetPrimary := rs.targetPrimaries[target.ShardName()]
Expand Down Expand Up @@ -260,6 +263,8 @@ func (rs *resharder) copySchema(ctx context.Context) error {
return err
}

// createStreams creates all of the VReplication streams that
// need to now exist on the new shards.
func (rs *resharder) createStreams(ctx context.Context) error {
var excludeRules []*binlogdatapb.Rule
for tableName, table := range rs.vschema.Tables {
Expand Down Expand Up @@ -321,7 +326,14 @@ func (rs *resharder) createStreams(ctx context.Context) error {
func (rs *resharder) startStreams(ctx context.Context) error {
err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error {
targetPrimary := rs.targetPrimaries[target.ShardName()]
query := fmt.Sprintf("update _vt.vreplication set state='Running' where db_name=%s", encodeString(targetPrimary.DbName()))
// This is the rare case where we truly want to update every stream/record
// because we've already confirmed that there were no existing workflows
// on the shards when we started, and we want to start all of the ones
// that we've created on the new shards as we're migrating them.
// We use the comment directive to indicate that this is intentional
// and OK.
query := fmt.Sprintf("update /*vt+ %s */ _vt.vreplication set state='Running' where db_name=%s",
vreplication.AllowUnsafeWriteCommentDirective, encodeString(targetPrimary.DbName()))
if _, err := rs.s.tmc.VReplicationExec(ctx, targetPrimary.Tablet, query); err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", targetPrimary.Tablet, query)
}
Expand Down
27 changes: 21 additions & 6 deletions go/vt/vtctl/workflow/stream_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/mysql/replication"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/key"
Expand All @@ -41,6 +40,13 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

/*
This file contains code that is specific to VReplication Reshard
workflows -- which require migrating the *other* VReplication
workflows (aside from the Reshard workflow itself) that exist in
the keyspace from one set of shards to another when switching traffic.
*/

// StreamType is an enum representing the kind of stream.
//
// (TODO:@ajm188) This should be made package-private once the last references
Expand All @@ -54,7 +60,9 @@ const (
StreamTypeReference
)

// StreamMigrator contains information needed to migrate a stream
// StreamMigrator contains information needed to migrate VReplication
// streams during Reshard workflows when the keyspace's VReplication
// workflows need to be migrated from one set of shards to another.
type StreamMigrator struct {
streams map[string][]*VReplicationStream
workflows []string
Expand Down Expand Up @@ -149,21 +157,26 @@ func (sm *StreamMigrator) Templates() []*VReplicationStream {
return VReplicationStreams(sm.templates).Copy().ToSlice()
}

// CancelMigration cancels a migration
func (sm *StreamMigrator) CancelMigration(ctx context.Context) {
// CancelStreamMigrations cancels the stream migrations.
func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) {
if sm.streams == nil {
return
}

_ = sm.deleteTargetStreams(ctx)

// Restart the source streams, but leave the Reshard workflow's reverse
// variant stopped.
err := sm.ts.ForAllSources(func(source *MigrationSource) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos=null, message='' where db_name=%s and workflow != %s", encodeString(source.GetPrimary().DbName()), encodeString(sm.ts.ReverseWorkflowName()))
// We intend to update all but our workflow's reverse streams, so we
// indicate that it's safe in this case using the comment diretive.
query := fmt.Sprintf("update /*vt+ %s */ _vt.vreplication set state='Running', stop_pos=null, message='' where db_name=%s and workflow != %s",
vreplication.AllowUnsafeWriteCommentDirective, encodeString(source.GetPrimary().DbName()), encodeString(sm.ts.ReverseWorkflowName()))
_, err := sm.ts.VReplicationExec(ctx, source.GetPrimary().Alias, query)
return err
})
if err != nil {
sm.logger.Errorf("Cancel migration failed: could not restart source streams: %v", err)
sm.logger.Errorf("Cancel stream migrations failed: could not restart source streams: %v", err)
}
}

Expand Down Expand Up @@ -200,6 +213,8 @@ func (sm *StreamMigrator) StopStreams(ctx context.Context) ([]string, error) {

/* tablet streams */

// readTabletStreams reads all of the VReplication workflow streams *except*
// the Reshard workflow's reverse variant.
func (sm *StreamMigrator) readTabletStreams(ctx context.Context, ti *topo.TabletInfo, constraint string) ([]*VReplicationStream, error) {
query := fmt.Sprintf("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name=%s and workflow != %s",
encodeString(ti.DbName()), encodeString(sm.ts.ReverseWorkflowName()))
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *StreamMigrator) (
}

func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) {
dr.drLog.Log("Cancel stream migrations as requested")
dr.drLog.Log("Cancel migration as requested")
}

func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string) (context.Context, func(*error), error) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
ts.Logger().Errorf("Cancel migration failed:", err)
}

sm.CancelMigration(ctx)
sm.CancelStreamMigrations(ctx)

err = ts.ForAllTargets(func(target *MigrationTarget) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (tm *TabletManager) HandleRPCPanic(ctx context.Context, name string, args,
if *err != nil {
// error case
log.Warningf("TabletManager.%v(%v)(on %v from %v) error: %v", name, args, topoproto.TabletAliasString(tm.tabletAlias), from, (*err).Error())
*err = vterrors.Wrapf(*err, "TabletManager.%v on %v error: %v", name, topoproto.TabletAliasString(tm.tabletAlias), (*err).Error())
*err = vterrors.Wrapf(*err, "TabletManager.%v on %v", name, topoproto.TabletAliasString(tm.tabletAlias))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes an improper usage of wrapping which led to the error/cause being duplicated.

} else {
// success case
log.Infof("TabletManager.%v(%v)(on %v from %v): %#v", name, args, topoproto.TabletAliasString(tm.tabletAlias), from, reply)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = left(%s, 1024) %s where id = %d"
sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = ''
where vd.id = vdt.vdiff_id and vd.id = %a and vd.state != 'completed'`
sqlGetVReplicationEntry = "select * from _vt.vreplication %s"
sqlGetVReplicationEntry = "select * from _vt.vreplication %s" // A filter/where is added by the caller
sqlGetVDiffsToRun = "select * from _vt.vdiff where state in ('started','pending')" // what VDiffs have not been stopped or completed
sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and json_unquote(json_extract(options, '$.core_options.auto_retry')) = 'true'"
sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %a"
Expand Down
89 changes: 87 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/controller_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vreplication

import (
"fmt"
"strings"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -50,6 +51,80 @@ const (
reshardingJournalQuery
)

// A comment directive that you can include in your VReplication write
// statements if you want to bypass the safety checks that ensure you are
// being selective. The full comment directive looks like this:
// delete /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ from _vt.vreplication
const AllowUnsafeWriteCommentDirective = "ALLOW_UNSAFE_VREPLICATION_WRITE"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️


// Check that the given WHERE clause is using at least one of the specified
// columns with an equality or in operator to ensure that it is being
// properly selective and not unintentionally going to potentially affect
// multiple workflows.
// The engine's exec function -- used by the VReplicationExec RPC -- should
// provide guardrails for data changing statements and if the user wants get
// around them they can e.g. use the ExecuteFetchAsDba RPC.
// If you as a developer truly do want to affect multiple workflows, you can
// add a comment directive using the AllowUnsafeWriteCommentDirective constant.
var isSelective = func(where *sqlparser.Where, columns ...*sqlparser.ColName) bool {
if where == nil {
return false
}
if len(columns) == 0 {
return true
}
selective := false
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
switch node := node.(type) {
case *sqlparser.ComparisonExpr:
column, ok := node.Left.(*sqlparser.ColName)
if !ok {
return true, nil
}
wantedColumn := false
for i := range columns {
if columns[i].Equal(column) {
wantedColumn = true
break
}
}
// If we found a desired column, check that it is being used with an
// equality operator OR an in clause, logically being equal to any
// of N things.
if wantedColumn &&
(node.Operator == sqlparser.EqualOp || node.Operator == sqlparser.InOp) {
selective = true // This is a safe statement
return false, nil // We can stop walking
}
default:
}
return true, nil
}, where)
return selective
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only limited to controller_plan or should this be expanded to include other places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like it's probably a good thing to expand over time, but at this point vreplication.Engine.Exec() is the critical one and what's done here.


// tableSelectiveColumns is a map that can be used to declare
// what selective columns should be used (one or more) in queries
// against a table.
var tableSelectiveColumns = map[string][]*sqlparser.ColName{
vreplicationTableName: {
{Name: sqlparser.NewIdentifierCI("id")},
{Name: sqlparser.NewIdentifierCI("workflow")},
},
}

// columnsAsCSV returns a comma-separated list of column names.
func columnsAsCSV(columns []*sqlparser.ColName) string {
if len(columns) == 0 {
return ""
}
colsForError := make([]string, len(columns))
for i := range columns {
colsForError[i] = columns[i].Name.String()
}
return strings.Join(colsForError, ", ")
}

// buildControllerPlan parses the input query and returns an appropriate plan.
func buildControllerPlan(query string, parser *sqlparser.Parser) (*controllerPlan, error) {
stmt, err := parser.Parse(query)
Expand Down Expand Up @@ -163,7 +238,12 @@ func buildUpdatePlan(upd *sqlparser.Update) (*controllerPlan, error) {
opcode: reshardingJournalQuery,
}, nil
case vreplicationTableName:
// no-op
if upd.Comments == nil || upd.Comments.Directives() == nil || !upd.Comments.Directives().IsSet(AllowUnsafeWriteCommentDirective) {
if safe := isSelective(upd.Where, tableSelectiveColumns[vreplicationTableName]...); !safe {
return nil, fmt.Errorf("unsafe WHERE clause in update without the /*vt+ %s */ comment directive: %s; should be using = or in with at least one of the following columns: %s",
AllowUnsafeWriteCommentDirective, sqlparser.String(upd.Where), columnsAsCSV(tableSelectiveColumns[vreplicationTableName]))
}
}
default:
return nil, fmt.Errorf("invalid table name: %s", tableName.Name.String())
}
Expand Down Expand Up @@ -220,7 +300,12 @@ func buildDeletePlan(del *sqlparser.Delete) (*controllerPlan, error) {
opcode: reshardingJournalQuery,
}, nil
case vreplicationTableName:
// no-op
if del.Comments == nil || del.Comments.Directives() == nil || !del.Comments.Directives().IsSet(AllowUnsafeWriteCommentDirective) {
if safe := isSelective(del.Where, tableSelectiveColumns[vreplicationTableName]...); !safe {
return nil, fmt.Errorf("unsafe WHERE clause in delete without the /*vt+ %s */ comment directive: %s; should be using = or in with at least one of the following columns: %s",
AllowUnsafeWriteCommentDirective, sqlparser.String(del.Where), columnsAsCSV(tableSelectiveColumns[vreplicationTableName]))
}
}
default:
return nil, fmt.Errorf("invalid table name: %s", tableName.Name.String())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,25 @@ func TestControllerPlan(t *testing.T) {
applier: "update _vt.vreplication set state = 'Running' where id in ::ids",
},
}, {
in: "update _vt.vreplication set state='Running'",
in: "update _vt.vreplication set state='Running'",
err: "unsafe WHERE clause in update without the /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ comment directive: ; should be using = or in with at least one of the following columns: id, workflow",
}, {
in: "update /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ _vt.vreplication set state='Running'",
plan: &testControllerPlan{
query: "update _vt.vreplication set state='Running'",
query: "update /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ _vt.vreplication set state='Running'",
opcode: updateQuery,
selector: "select id from _vt.vreplication",
applier: "update _vt.vreplication set state = 'Running' where id in ::ids",
applier: "update /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ _vt.vreplication set state = 'Running' where id in ::ids",
},
}, {
in: "update _vt.vreplication set state='Running', message='' where id >= 1",
err: "unsafe WHERE clause in update without the /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ comment directive: where id >= 1; should be using = or in with at least one of the following columns: id, workflow",
}, {
in: "update _vt.vreplication set state = 'Running' where state in ('Stopped', 'Error')",
err: "unsafe WHERE clause in update without the /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ comment directive: where state in ('Stopped', 'Error'); should be using = or in with at least one of the following columns: id, workflow",
}, {
in: "update _vt.vreplication set state='Running', message='' where state='Stopped'",
err: "unsafe WHERE clause in update without the /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ comment directive: where state = 'Stopped'; should be using = or in with at least one of the following columns: id, workflow",
}, {
in: "update _vt.vreplication set state='Running' where a = 1",
plan: &testControllerPlan{
Expand All @@ -128,6 +140,7 @@ func TestControllerPlan(t *testing.T) {
selector: "select id from _vt.vreplication where a = 1",
applier: "update _vt.vreplication set state = 'Running' where id in ::ids",
},
err: "unsafe WHERE clause in update without the /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ comment directive: where a = 1; should be using = or in with at least one of the following columns: id, workflow",
}, {
in: "update _vt.resharding_journal set col = 1",
plan: &testControllerPlan{
Expand Down Expand Up @@ -159,15 +172,21 @@ func TestControllerPlan(t *testing.T) {
delPostCopyAction: "delete from _vt.post_copy_action where vrepl_id in ::ids",
},
}, {
in: "delete from _vt.vreplication",
in: "delete from _vt.vreplication",
err: "unsafe WHERE clause in delete without the /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ comment directive: ; should be using = or in with at least one of the following columns: id, workflow",
}, {
in: "delete /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ from _vt.vreplication",
plan: &testControllerPlan{
query: "delete from _vt.vreplication",
query: "delete /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ from _vt.vreplication",
opcode: deleteQuery,
selector: "select id from _vt.vreplication",
applier: "delete from _vt.vreplication where id in ::ids",
applier: "delete /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ from _vt.vreplication where id in ::ids",
delCopyState: "delete from _vt.copy_state where vrepl_id in ::ids",
delPostCopyAction: "delete from _vt.post_copy_action where vrepl_id in ::ids",
},
}, {
in: "delete from _vt.vreplication where state='Stopped'",
err: "unsafe WHERE clause in delete without the /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ comment directive: where state = 'Stopped'; should be using = or in with at least one of the following columns: id, workflow",
}, {
in: "delete from _vt.vreplication where a = 1",
plan: &testControllerPlan{
Expand All @@ -178,6 +197,7 @@ func TestControllerPlan(t *testing.T) {
delCopyState: "delete from _vt.copy_state where vrepl_id in ::ids",
delPostCopyAction: "delete from _vt.post_copy_action where vrepl_id in ::ids",
},
err: "unsafe WHERE clause in delete without the /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ comment directive: where a = 1; should be using = or in with at least one of the following columns: id, workflow",
}, {
in: "delete from _vt.resharding_journal where id = 1",
plan: &testControllerPlan{
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ import (
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
Expand Down Expand Up @@ -533,7 +534,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
}
return qr, nil
case selectQuery, reshardingJournalQuery:
// select and resharding journal queries are passed through.
// Selects and resharding journal queries are passed through.
return dbClient.ExecuteFetch(plan.query, maxRows)
}
panic("unreachable")
Expand Down Expand Up @@ -856,7 +857,7 @@ func (vre *Engine) readAllRows(ctx context.Context) ([]map[string]string, error)
return nil, err
}
defer dbClient.Close()
qr, err := dbClient.ExecuteFetch(fmt.Sprintf("select * from _vt.vreplication where db_name=%v", encodeString(vre.dbName)), maxRows)
qr, err := dbClient.ExecuteFetch(fmt.Sprintf("select * from _vt.vreplication where db_name=%s", encodeString(vre.dbName)), maxRows)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading