Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ApplySchema: support --batch-size flag in 'direct' strategy #13693

Merged
merged 4 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/cmd/vtctld/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func initSchema() {
_, err = schemamanager.Run(
ctx,
controller,
schemamanager.NewTabletExecutor("vtctld/schema", wr.TopoServer(), wr.TabletManagerClient(), wr.Logger(), schemaChangeReplicasTimeout),
schemamanager.NewTabletExecutor("vtctld/schema", wr.TopoServer(), wr.TabletManagerClient(), wr.Logger(), schemaChangeReplicasTimeout, 0),
)
if err != nil {
log.Errorf("Schema change failed, error: %v", err)
Expand Down
3 changes: 3 additions & 0 deletions go/cmd/vtctldclient/command/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ var applySchemaOptions = struct {
WaitReplicasTimeout time.Duration
SkipPreflight bool
CallerID string
BatchSize int64
}{}

func commandApplySchema(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -145,6 +146,7 @@ func commandApplySchema(cmd *cobra.Command, args []string) error {
MigrationContext: applySchemaOptions.MigrationContext,
WaitReplicasTimeout: protoutil.DurationToProto(applySchemaOptions.WaitReplicasTimeout),
CallerId: cid,
BatchSize: applySchemaOptions.BatchSize,
})
if err != nil {
return err
Expand Down Expand Up @@ -294,6 +296,7 @@ func init() {
ApplySchema.Flags().StringVar(&applySchemaOptions.CallerID, "caller-id", "", "Effective caller ID used for the operation and should map to an ACL name which grants this identity the necessary permissions to perform the operation (this is only necessary when strict table ACLs are used).")
ApplySchema.Flags().StringArrayVar(&applySchemaOptions.SQL, "sql", nil, "Semicolon-delimited, repeatable SQL commands to apply. Exactly one of --sql|--sql-file is required.")
ApplySchema.Flags().StringVar(&applySchemaOptions.SQLFile, "sql-file", "", "Path to a file containing semicolon-delimited SQL commands to apply. Exactly one of --sql|--sql-file is required.")
ApplySchema.Flags().Int64Var(&applySchemaOptions.BatchSize, "batch-size", 0, "How many queries to batch together. Only applicabel when all queries are CREATE TABLE|VIEW")

Root.AddCommand(ApplySchema)

Expand Down
5 changes: 4 additions & 1 deletion go/test/endtoend/cluster/vtctlclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type VtctlClientParams struct {
MigrationContext string
UUIDList string
CallerID string
BatchSize int
}

// InitShardPrimary executes vtctlclient command to make specified tablet the primary for the shard.
Expand Down Expand Up @@ -87,7 +88,9 @@ func (vtctlclient *VtctlClientProcess) ApplySchemaWithOutput(Keyspace string, SQ
if params.UUIDList != "" {
args = append(args, "--uuid_list", params.UUIDList)
}

if params.BatchSize > 0 {
args = append(args, "--batch_size", fmt.Sprintf("%d", params.BatchSize))
}
if params.CallerID != "" {
args = append(args, "--caller_id", params.CallerID)
}
Expand Down
21 changes: 18 additions & 3 deletions go/test/endtoend/vtgate/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func TestSchemaChange(t *testing.T) {
testWithAlterDatabase(t)
testWithDropCreateSchema(t)
testDropNonExistentTables(t)
testApplySchemaBatch(t)
testCreateInvalidView(t)
testCopySchemaShards(t, clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].VttabletProcess.TabletPath, 2)
testCopySchemaShards(t, fmt.Sprintf("%s/0", keyspaceName), 3)
Expand All @@ -126,7 +127,6 @@ func testWithInitialSchema(t *testing.T) {

// Check if 4 tables are created
checkTables(t, totalTableCount)
checkTables(t, totalTableCount)

// Also match the vschema for those tablets
matchSchema(t, clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].VttabletProcess.TabletPath, clusterInstance.Keyspaces[0].Shards[1].Vttablets[0].VttabletProcess.TabletPath)
Expand All @@ -144,7 +144,7 @@ func testWithAlterSchema(t *testing.T) {
func testWithAlterDatabase(t *testing.T) {
sql := "create database alter_database_test; alter database alter_database_test default character set = utf8mb4; drop database alter_database_test"
err := clusterInstance.VtctlclientProcess.ApplySchema(keyspaceName, sql)
assert.Nil(t, err)
assert.NoError(t, err)
}

// testWithDropCreateSchema , we should be able to drop and create same schema
Expand All @@ -158,7 +158,7 @@ func testWithAlterDatabase(t *testing.T) {
func testWithDropCreateSchema(t *testing.T) {
dropCreateTable := fmt.Sprintf("DROP TABLE vt_select_test_%02d ;", 2) + fmt.Sprintf(createTable, fmt.Sprintf("vt_select_test_%02d", 2))
err := clusterInstance.VtctlclientProcess.ApplySchema(keyspaceName, dropCreateTable)
require.Nil(t, err)
require.NoError(t, err)
checkTables(t, totalTableCount)
}

Expand Down Expand Up @@ -225,6 +225,21 @@ func testCreateInvalidView(t *testing.T) {
}
}

func testApplySchemaBatch(t *testing.T) {
{
sqls := "create table batch1(id int primary key);create table batch2(id int primary key);create table batch3(id int primary key);create table batch4(id int primary key);create table batch5(id int primary key);"
_, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ApplySchema", "--", "--sql", sqls, "--batch_size", "2", keyspaceName)
require.NoError(t, err)
checkTables(t, totalTableCount+5)
}
{
sqls := "drop table batch1; drop table batch2; drop table batch3; drop table batch4; drop table batch5"
_, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ApplySchema", "--", "--sql", sqls, keyspaceName)
require.NoError(t, err)
checkTables(t, totalTableCount)
}
}

// checkTables checks the number of tables in the first two shards.
func checkTables(t *testing.T, count int) {
checkTablesCount(t, clusterInstance.Keyspaces[0].Shards[0].Vttablets[0], count)
Expand Down
Loading
Loading