Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add -rename_tables to DropSources #6383

Merged
merged 1 commit into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,18 @@ func shardCustomer(t *testing.T, testReverse bool) {
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites", "customer.p2c"); err != nil {
t.Fatalf("SwitchWrites error: %s\n", output)
}
want = dryRunResultsDropSourcesCustomerShard
want = dryRunResultsDropSourcesDropCustomerShard
if output, err = vc.VtctlClient.ExecuteCommandWithOutput("DropSources", "-dry_run", "customer.p2c"); err != nil {
t.Fatalf("DropSources dry run error: %s\n", output)
}
validateDryRunResults(t, output, want)

want = dryRunResultsDropSourcesRenameCustomerShard
if output, err = vc.VtctlClient.ExecuteCommandWithOutput("DropSources", "-dry_run", "-rename_tables", "customer.p2c"); err != nil {
t.Fatalf("DropSources dry run with rename error: %s\n", output)
}
validateDryRunResults(t, output, want)

var exists bool
if exists, err = checkIfBlacklistExists(t, vc, "product:0", "customer"); err != nil {
t.Fatal("Error getting blacklist for customer:0")
Expand Down
20 changes: 18 additions & 2 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,27 @@ var dryrunresultsswitchwritesM2m3 = []string{
"Unlock keyspace merchant",
}

var dryRunResultsDropSourcesCustomerShard = []string{
var dryRunResultsDropSourcesDropCustomerShard = []string{
"Lock keyspace product",
"Lock keyspace customer",
"Dropping following tables:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer RemovalType DROP TABLE",
"Blacklisted tables customer will be removed from:",
" Keyspace product Shard 0 Tablet 100",
"Delete reverse vreplication streams on source:",
" Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100",
"Delete vreplication streams on target:",
" Keyspace customer Shard -80 Workflow p2c DbName vt_customer Tablet 200",
" Keyspace customer Shard 80- Workflow p2c DbName vt_customer Tablet 300",
"Unlock keyspace customer",
"Unlock keyspace product",
}

var dryRunResultsDropSourcesRenameCustomerShard = []string{
"Lock keyspace product",
"Lock keyspace customer",
"Dropping following tables:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer RemovalType RENAME TABLE",
"Blacklisted tables customer will be removed from:",
" Keyspace product Shard 0 Tablet 100",
"Delete reverse vreplication streams on source:",
Expand Down
11 changes: 9 additions & 2 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ var commands = []commandGroup{
"[-cell=<cell>] [-tablet_types=<source_tablet_types>] -workflow=<workflow> <source_keyspace> <target_keyspace> <table_specs>",
`Move table(s) to another keyspace, table_specs is a list of tables or the tables section of the vschema for the target keyspace. Example: '{"t1":{"column_vindexes": [{""column": "id1", "name": "hash"}]}, "t2":{"column_vindexes": [{""column": "id2", "name": "hash"}]}}`},
{"DropSources", commandDropSources,
"[-dry_run] <keyspace.workflow>",
"[-dry_run] [-rename_tables] <keyspace.workflow>",
"After a MoveTables or Resharding workflow cleanup unused artifacts like source tables, source shards and blacklists"},
{"CreateLookupVindex", commandCreateLookupVindex,
"[-cell=<cell>] [-tablet_types=<source_tablet_types>] <keyspace> <json_spec>",
Expand Down Expand Up @@ -2042,6 +2042,7 @@ func commandMigrateServedFrom(ctx context.Context, wr *wrangler.Wrangler, subFla

func commandDropSources(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
dryRun := subFlags.Bool("dry_run", false, "Does a dry run of commandDropSources and only reports the actions to be taken")
renameTables := subFlags.Bool("rename_tables", false, "Rename tables instead of dropping them")
if err := subFlags.Parse(args); err != nil {
return err
}
Expand All @@ -2052,8 +2053,14 @@ func commandDropSources(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
if err != nil {
return err
}

removalType := wrangler.DropTable
if *renameTables {
removalType = wrangler.RenameTable
}

_, _, _ = dryRun, keyspace, workflow
dryRunResults, err := wr.DropSources(ctx, keyspace, workflow, *dryRun)
dryRunResults, err := wr.DropSources(ctx, keyspace, workflow, removalType, *dryRun)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/stream_migrater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestStreamMigrateMainflow(t *testing.T) {

tme.expectDeleteReverseVReplication()
tme.expectDeleteTargetVReplication()
if _, err := tme.wr.DropSources(ctx, tme.targetKeyspace, "test", false); err != nil {
if _, err := tme.wr.DropSources(ctx, tme.targetKeyspace, "test", DropTable, false); err != nil {
t.Fatal(err)
}
verifyQueries(t, tme.allDBClients)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/wrangler/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func (r *switcher) validateWorkflowHasCompleted(ctx context.Context) error {

//TODO: do we need to disable ForeignKey before dropping tables?
//TODO: delete multiple tables in single statement?
func (r *switcher) dropSourceTables(ctx context.Context) error {
return r.ts.dropSourceTables(ctx)
func (r *switcher) removeSourceTables(ctx context.Context, removalType TableRemovalType) error {
return r.ts.removeSourceTables(ctx, removalType)
}

func (r *switcher) dropSourceShards(ctx context.Context) error {
Expand Down
6 changes: 3 additions & 3 deletions go/vt/wrangler/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,12 @@ func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string)
}, nil
}

func (dr *switcherDryRun) dropSourceTables(ctx context.Context) error {
func (dr *switcherDryRun) removeSourceTables(ctx context.Context, removalType TableRemovalType) error {
logs := make([]string, 0)
for _, source := range dr.ts.sources {
for _, tableName := range dr.ts.tables {
logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s DbName %s Tablet %d Table %s",
source.master.Keyspace, source.master.Shard, source.master.DbName(), source.master.Alias.Uid, tableName))
logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s DbName %s Tablet %d Table %s RemovalType %s",
source.master.Keyspace, source.master.Shard, source.master.DbName(), source.master.Alias.Uid, tableName, TableRemovalType(removalType)))
}
}
if len(logs) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/switcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type iswitcher interface {
switchTableReads(ctx context.Context, cells []string, servedType topodatapb.TabletType, direction TrafficSwitchDirection) error
switchShardReads(ctx context.Context, cells []string, servedType topodatapb.TabletType, direction TrafficSwitchDirection) error
validateWorkflowHasCompleted(ctx context.Context) error
dropSourceTables(ctx context.Context) error
removeSourceTables(ctx context.Context, removalType TableRemovalType) error
dropSourceShards(ctx context.Context) error
dropSourceBlacklistedTables(ctx context.Context) error
freezeTargetVReplication(ctx context.Context) error
Expand Down
39 changes: 33 additions & 6 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,27 @@ const (
DirectionBackward
)

// TableRemovalType specifies the way the a table will be removed
type TableRemovalType int

// The following consts define if DropSource will drop or rename the table
const (
DropTable = TableRemovalType(iota)
RenameTable
)

func (trt TableRemovalType) String() string {
types := [...]string{
"DROP TABLE",
"RENAME TABLE",
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth extracting the above code outside the method, since it always generates the same. Also, suggesting to make it a map, such that e.g. types[DropTable]="DROP TABLE".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shlomi-noach from a logical perspective, it makes total sense to move the code outside a method that gets repeatedly called. However, I don't understand what you mean by "make it a map". I mean, I could figure out how, but I don't get why.

Copy link
Contributor

@shlomi-noach shlomi-noach Jun 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomkrouper yeah, so I believe it would be a more idiomatic way; for example, right now the code runs if trt < DropTable || trt > RenameTable {, but, if we add a new, third option, then we'd need to change trt > RenameTable to trt > SomethingNew ; where in fact we don't care about ranges, but of existence, and that's where maps are idiomatic and their use is widespread.

So, if we have:

mymap := map[string]string{DropTable: "DROP TABLE", RenameTable: "RENAME TABLE"}

we can then:

if t, ok := mymap[trt] ; ok {

and that answers "is trt known or unknown to us".

Anyway, that's the why the way I understand it, but I may be just biased towards my own habits.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes total sense. Thanks.

if trt < DropTable || trt > RenameTable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

following the above map suggestion, the above would turn to:

if t, ok := tableRemovalTypeMap[trt] ; ok {
  return t
}
return "Unknown"

return "Unknown"
}

return types[trt]
}

// accessType specifies the type of access for a shard (allow/disallow writes).
type accessType int

Expand Down Expand Up @@ -293,7 +314,7 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflow s
}

// DropSources cleans up source tables, shards and blacklisted tables after a MoveTables/Reshard is completed
func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflow string, dryRun bool) (*[]string, error) {
func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflow string, removalType TableRemovalType, dryRun bool) (*[]string, error) {
ts, err := wr.buildTrafficSwitcher(ctx, targetKeyspace, workflow)
if err != nil {
wr.Logger().Errorf("buildTrafficSwitcher failed: %v", err)
Expand Down Expand Up @@ -328,7 +349,7 @@ func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflow st
}
switch ts.migrationType {
case binlogdatapb.MigrationType_TABLES:
if err := sw.dropSourceTables(ctx); err != nil {
if err := sw.removeSourceTables(ctx, removalType); err != nil {
return nil, err
}
if err := sw.dropSourceBlacklistedTables(ctx); err != nil {
Expand Down Expand Up @@ -1122,17 +1143,23 @@ func doValidateWorkflowHasCompleted(ctx context.Context, ts *trafficSwitcher) er

}

func (ts *trafficSwitcher) dropSourceTables(ctx context.Context) error {
func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType TableRemovalType) error {
return ts.forAllSources(func(source *tsSource) error {
for _, tableName := range ts.tables {
ts.wr.Logger().Infof("Dropping table %s.%s\n", source.master.DbName(), tableName)
query := fmt.Sprintf("drop table %s.%s", source.master.DbName(), tableName)
if removalType == DropTable {
ts.wr.Logger().Infof("Dropping table %s.%s\n", source.master.DbName(), tableName)
} else {
renameName := fmt.Sprintf("_%.63s", tableName)
ts.wr.Logger().Infof("Renaming table %s.%s to %s.%s\n", source.master.DbName(), tableName, source.master.DbName(), renameName)
query = fmt.Sprintf("rename table %s.%s TO %s.%s", source.master.DbName(), tableName, source.master.DbName(), renameName)
}
_, err := ts.wr.ExecuteFetchAsDba(ctx, source.master.Alias, query, 1, false, true)
if err != nil {
ts.wr.Logger().Errorf("Error dropping table %s: %v", tableName, err)
ts.wr.Logger().Errorf("Error removing table %s: %v", tableName, err)
return err
}
ts.wr.Logger().Infof("Dropped table %s.%s\n", source.master.DbName(), tableName)
ts.wr.Logger().Infof("Removed table %s.%s\n", source.master.DbName(), tableName)

}
return nil
Expand Down
37 changes: 31 additions & 6 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ func TestTableMigrateOneToMany(t *testing.T) {
tme.dbTargetClients[1].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil)
}
dropSourcesInvalid()
_, err = tme.wr.DropSources(ctx, tme.targetKeyspace, "test", false)
_, err = tme.wr.DropSources(ctx, tme.targetKeyspace, "test", DropTable, false)
require.Error(t, err, "Workflow has not completed, cannot DropSources")

_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, false)
Expand All @@ -815,13 +815,12 @@ func TestTableMigrateOneToMany(t *testing.T) {
tme.dbTargetClients[1].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil)
}
dropSourcesDryRun()

wantdryRunDropSources := []string{
"Lock keyspace ks1",
"Lock keyspace ks2",
"Dropping following tables:",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t1",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t1 RemovalType DROP TABLE",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2 RemovalType DROP TABLE",
"Blacklisted tables t1,t2 will be removed from:",
" Keyspace ks1 Shard 0 Tablet 10",
"Delete reverse vreplication streams on source:",
Expand All @@ -832,11 +831,37 @@ func TestTableMigrateOneToMany(t *testing.T) {
"Unlock keyspace ks2",
"Unlock keyspace ks1",
}
results, err := tme.wr.DropSources(ctx, tme.targetKeyspace, "test", true)
results, err := tme.wr.DropSources(ctx, tme.targetKeyspace, "test", DropTable, true)
require.NoError(t, err)
require.Empty(t, cmp.Diff(wantdryRunDropSources, *results))
checkBlacklist(t, tme.ts, fmt.Sprintf("%s:%s", "ks1", "0"), []string{"t1", "t2"})

dropSourcesDryRunRename := func() {
tme.dbTargetClients[0].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil)
tme.dbTargetClients[1].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil)
}
dropSourcesDryRunRename()
wantdryRunRenameSources := []string{
"Lock keyspace ks1",
"Lock keyspace ks2",
"Dropping following tables:",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t1 RemovalType RENAME TABLE",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2 RemovalType RENAME TABLE",
"Blacklisted tables t1,t2 will be removed from:",
" Keyspace ks1 Shard 0 Tablet 10",
"Delete reverse vreplication streams on source:",
" Keyspace ks1 Shard 0 Workflow test_reverse DbName vt_ks1 Tablet 10",
"Delete vreplication streams on target:",
" Keyspace ks2 Shard -80 Workflow test DbName vt_ks2 Tablet 20",
" Keyspace ks2 Shard 80- Workflow test DbName vt_ks2 Tablet 30",
"Unlock keyspace ks2",
"Unlock keyspace ks1",
}
results, err = tme.wr.DropSources(ctx, tme.targetKeyspace, "test", RenameTable, true)
require.NoError(t, err)
require.Empty(t, cmp.Diff(wantdryRunRenameSources, *results))
checkBlacklist(t, tme.ts, fmt.Sprintf("%s:%s", "ks1", "0"), []string{"t1", "t2"})

dropSources := func() {
tme.dbTargetClients[0].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil)
tme.dbTargetClients[1].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil)
Expand All @@ -852,7 +877,7 @@ func TestTableMigrateOneToMany(t *testing.T) {
}
dropSources()

_, err = tme.wr.DropSources(ctx, tme.targetKeyspace, "test", false)
_, err = tme.wr.DropSources(ctx, tme.targetKeyspace, "test", DropTable, false)
require.NoError(t, err)
checkBlacklist(t, tme.ts, fmt.Sprintf("%s:%s", "ks1", "0"), nil)

Expand Down