Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding ddl_strategy session variable #7042

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions go/test/endtoend/onlineddl/onlineddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,22 @@ var (
msg varchar(64),
PRIMARY KEY (id)
) ENGINE=InnoDB;`
// To verify non online-DDL behavior
alterTableNormalStatement = `
ALTER TABLE %s
ADD COLUMN non_online INT UNSIGNED NOT NULL`
// The following statement is valid
alterTableSuccessfulStatament = `
alterTableSuccessfulStatement = `
ALTER WITH 'gh-ost' TABLE %s
MODIFY id BIGINT UNSIGNED NOT NULL,
ADD COLUMN ghost_col INT NOT NULL,
ADD INDEX idx_msg(msg)`
// The following statement will fail because gh-ost requires some shared unique key
alterTableFailedStatament = `
alterTableFailedStatement = `
ALTER WITH 'gh-ost' TABLE %s
DROP PRIMARY KEY,
DROP COLUMN ghost_col`
alterTableThrottlingStatament = `
alterTableThrottlingStatement = `
ALTER WITH 'gh-ost' '--max-load=Threads_running=1' TABLE %s
DROP COLUMN ghost_col`
)
Expand Down Expand Up @@ -123,20 +127,23 @@ func TestSchemaChange(t *testing.T) {
assert.Equal(t, 2, len(clusterInstance.Keyspaces[0].Shards))
testWithInitialSchema(t)
{
uuid := testAlterTable(t, alterTableSuccessfulStatament)
_ = testAlterTable(t, alterTableNormalStatement, false, "non_online")
}
{
uuid := testAlterTable(t, alterTableSuccessfulStatement, true, "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
}
{
uuid := testAlterTable(t, alterTableThrottlingStatament)
uuid := testAlterTable(t, alterTableThrottlingStatement, true, "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusRunning)
checkCancelMigration(t, uuid, true)
time.Sleep(2 * time.Second)
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
}
{
uuid := testAlterTable(t, alterTableFailedStatament)
uuid := testAlterTable(t, alterTableFailedStatement, true, "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, true)
Expand All @@ -157,16 +164,20 @@ func testWithInitialSchema(t *testing.T) {
}

// testAlterTable runs an online DDL, ALTER statement
func testAlterTable(t *testing.T, alterStatement string) (uuid string) {
func testAlterTable(t *testing.T, alterStatement string, isOnlineDDL bool, expectColumn string) (uuid string) {
tableName := fmt.Sprintf("vt_onlineddl_test_%02d", 3)
sqlQuery := fmt.Sprintf(alterStatement, tableName)
uuid, err := clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, sqlQuery)
require.Nil(t, err)
uuid = strings.TrimSpace(uuid)
require.NotEmpty(t, uuid)
// Migration is asynchronous. Give it some time.
time.Sleep(time.Second * 20)
checkMigratedTable(t, tableName)
if isOnlineDDL {
require.NotEmpty(t, uuid)
// Migration is asynchronous. Give it some time.
time.Sleep(time.Second * 20)
} else {
require.Empty(t, uuid)
}
checkMigratedTable(t, tableName, expectColumn)
return uuid
}

Expand Down Expand Up @@ -239,11 +250,10 @@ func checkRetryMigration(t *testing.T, uuid string, expectRetryPossible bool) {
}

// checkMigratedTables checks the CREATE STATEMENT of a table after migration
func checkMigratedTable(t *testing.T, tableName string) {
expect := "ghost_col"
func checkMigratedTable(t *testing.T, tableName, expectColumn string) {
for i := range clusterInstance.Keyspaces[0].Shards {
createStatement := getCreateTableStatement(t, clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], tableName)
assert.Contains(t, createStatement, expect)
assert.Contains(t, createStatement, expectColumn)
}
}

Expand Down
181 changes: 96 additions & 85 deletions go/vt/proto/vtgate/vtgate.pb.go

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions go/vt/schema/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@ type OnlineDDL struct {
Retries int64 `json:"retries,omitempty"`
}

func ValidateDDLStrategy(strategy string) (sqlparser.DDLStrategy, error) {
switch sqlparser.DDLStrategy(strategy) {
case DDLStrategyGhost:
return DDLStrategyGhost, nil
case DDLStrategyPTOSC:
return DDLStrategyPTOSC, nil
case DDLStrategyNormal:
return DDLStrategyNormal, nil
default:
return DDLStrategyNormal, fmt.Errorf("Unknown online DDL strategy: '%v'", strategy)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: either return value or error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case DDLStrategyNormal is equilavent to returning empty string, so this may be ok.

}
}

// FromJSON creates an OnlineDDL from json
func FromJSON(bytes []byte) (*OnlineDDL, error) {
onlineDDL := &OnlineDDL{}
Expand Down
22 changes: 22 additions & 0 deletions go/vt/schema/online_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,28 @@ func TestCreateUUID(t *testing.T) {
assert.NoError(t, err)
}

func TestValidateDDLStrategy(t *testing.T) {
{
strategy, err := ValidateDDLStrategy("gh-ost")
assert.NoError(t, err)
assert.Equal(t, DDLStrategyGhost, strategy)
}
{
strategy, err := ValidateDDLStrategy("pt-osc")
assert.NoError(t, err)
assert.Equal(t, DDLStrategyPTOSC, strategy)
}
{
strategy, err := ValidateDDLStrategy("")
assert.NoError(t, err)
assert.Equal(t, DDLStrategyNormal, strategy)
}
{
_, err := ValidateDDLStrategy("other")
assert.Error(t, err)
}
}

func TestIsOnlineDDLUUID(t *testing.T) {
for i := 0; i < 20; i++ {
uuid, err := CreateUUID()
Expand Down
1 change: 1 addition & 0 deletions go/vt/sqlparser/expression_rewriting.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (er *expressionRewriter) sysVarRewrite(cursor *Cursor, node *ColName) {
sysvars.SQLSelectLimit.Name,
sysvars.TransactionMode.Name,
sysvars.Workload.Name,
sysvars.DDLStrategy.Name,
sysvars.ReadAfterWriteGTID.Name,
sysvars.ReadAfterWriteTimeOut.Name,
sysvars.SessionTrackGTIDs.Name:
Expand Down
6 changes: 6 additions & 0 deletions go/vt/sqlparser/expression_rewriting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
type myTestCase struct {
in, expected string
liid, db, foundRows, rowCount, rawGTID, rawTimeout, sessTrackGTID bool
ddlStrategy bool
udv int
autocommit, clientFoundRows, skipQueryPlanCache bool
sqlSelectLimit, transactionMode, workload bool
Expand Down Expand Up @@ -145,6 +146,10 @@ func TestRewrites(in *testing.T) {
in: `select (select (select (select (select (select last_insert_id()))))) as x`,
expected: "select :__lastInsertId as x from dual",
liid: true,
}, {
in: `select * from user where col = @@ddl_strategy`,
expected: "select * from user where col = :__vtddl_strategy",
ddlStrategy: true,
}, {
in: `select * from user where col = @@read_after_write_gtid OR col = @@read_after_write_timeout OR col = @@session_track_gtids`,
expected: "select * from user where col = :__vtread_after_write_gtid or col = :__vtread_after_write_timeout or col = :__vtsession_track_gtids",
Expand Down Expand Up @@ -177,6 +182,7 @@ func TestRewrites(in *testing.T) {
assert.Equal(tc.sqlSelectLimit, result.NeedsSysVar(sysvars.SQLSelectLimit.Name), "should need :__vtsqlSelectLimit")
assert.Equal(tc.transactionMode, result.NeedsSysVar(sysvars.TransactionMode.Name), "should need :__vttransactionMode")
assert.Equal(tc.workload, result.NeedsSysVar(sysvars.Workload.Name), "should need :__vtworkload")
assert.Equal(tc.ddlStrategy, result.NeedsSysVar(sysvars.DDLStrategy.Name), "should need ddlStrategy")
assert.Equal(tc.rawGTID, result.NeedsSysVar(sysvars.ReadAfterWriteGTID.Name), "should need rawGTID")
assert.Equal(tc.rawTimeout, result.NeedsSysVar(sysvars.ReadAfterWriteTimeOut.Name), "should need rawTimeout")
assert.Equal(tc.sessTrackGTID, result.NeedsSysVar(sysvars.SessionTrackGTIDs.Name), "should need sessTrackGTID")
Expand Down
3 changes: 3 additions & 0 deletions go/vt/sysvars/sysvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ var (
Workload = SystemVariable{Name: "workload", IdentifierAsString: true}
Charset = SystemVariable{Name: "charset", Default: utf8, IdentifierAsString: true}
Names = SystemVariable{Name: "names", Default: utf8, IdentifierAsString: true}
// Online DDL
DDLStrategy = SystemVariable{Name: "ddl_strategy", IdentifierAsString: true}

// Read After Write settings
ReadAfterWriteGTID = SystemVariable{Name: "read_after_write_gtid"}
Expand All @@ -67,6 +69,7 @@ var (
TransactionReadOnly,
SQLSelectLimit,
TransactionMode,
DDLStrategy,
Workload,
Charset,
Names,
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type noopVCursor struct {
ctx context.Context
}

func (t noopVCursor) SetDDLStrategy(strategy sqlparser.DDLStrategy) {
panic("implement me")
}

func (t noopVCursor) SetReadAfterWriteGTID(s string) {
panic("implement me")
}
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ type (
SetWorkload(querypb.ExecuteOptions_Workload)
SetFoundRows(uint64)

SetDDLStrategy(sqlparser.DDLStrategy)

// SetReadAfterWriteGTID sets the GTID that the user expects a replica to have caught up with before answering a query
SetReadAfterWriteGTID(string)
SetReadAfterWriteTimeout(float64)
Expand Down
11 changes: 11 additions & 0 deletions go/vt/vtgate/engine/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"vitess.io/vitess/go/vt/key"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)
Expand Down Expand Up @@ -398,6 +399,16 @@ func (svss *SysVarSetAware) Execute(vcursor VCursor, env evalengine.ExpressionEn
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid workload: %s", str)
}
vcursor.Session().SetWorkload(querypb.ExecuteOptions_Workload(out))
case sysvars.DDLStrategy.Name:
str, err := svss.evalAsString(env)
if err != nil {
return err
}
strategy, err := schema.ValidateDDLStrategy(str)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid DDL strategy: %s", str)
}
vcursor.Session().SetDDLStrategy(strategy)
case sysvars.Charset.Name, sysvars.Names.Name:
str, err := svss.evalAsString(env)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ func (e *Executor) addNeededBindVars(bindVarNeeds *sqlparser.BindVarNeeds, bindV
v = options.GetWorkload().String()
})
bindVars[key] = sqltypes.StringBindVariable(v)
case sysvars.DDLStrategy.Name:
bindVars[key] = sqltypes.StringBindVariable(session.DDLStrategy)
case sysvars.ReadAfterWriteGTID.Name:
var v string
ifReadAfterWriteExist(session, func(raw *vtgatepb.ReadAfterWrite) {
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ func TestSelectSystemVariables(t *testing.T) {

sql := "select @@autocommit, @@client_found_rows, @@skip_query_plan_cache, " +
"@@sql_select_limit, @@transaction_mode, @@workload, @@read_after_write_gtid, " +
"@@read_after_write_timeout, @@session_track_gtids"
"@@read_after_write_timeout, @@session_track_gtids, @@ddl_strategy"

result, err := executorExec(executor, sql, map[string]*querypb.BindVariable{})
wantResult := &sqltypes.Result{
Fields: []*querypb.Field{
Expand All @@ -341,6 +342,7 @@ func TestSelectSystemVariables(t *testing.T) {
{Name: "@@read_after_write_gtid", Type: sqltypes.VarBinary},
{Name: "@@read_after_write_timeout", Type: sqltypes.Float64},
{Name: "@@session_track_gtids", Type: sqltypes.VarBinary},
{Name: "@@ddl_strategy", Type: sqltypes.VarBinary},
},
RowsAffected: 1,
Rows: [][]sqltypes.Value{{
Expand All @@ -355,6 +357,7 @@ func TestSelectSystemVariables(t *testing.T) {
sqltypes.NewVarBinary("a fine gtid"),
sqltypes.NewFloat64(13),
sqltypes.NewVarBinary("own_gtid"),
sqltypes.NewVarBinary(""),
}},
}
require.NoError(t, err)
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/safe_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/golang/protobuf/proto"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -453,6 +454,13 @@ func (session *SafeSession) ResetShard(tabletAlias *topodatapb.TabletAlias) erro
return nil
}

// SetDDLStrategy set the DDLStrategy setting.
func (session *SafeSession) SetDDLStrategy(strategy sqlparser.DDLStrategy) {
session.mu.Lock()
defer session.mu.Unlock()
session.DDLStrategy = string(strategy)
}

// SetReadAfterWriteGTID set the ReadAfterWriteGtid setting.
func (session *SafeSession) SetReadAfterWriteGTID(vtgtid string) {
session.mu.Lock()
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,11 @@ func (vc *vcursorImpl) SetFoundRows(foundRows uint64) {
vc.safeSession.foundRowsHandled = true
}

// SetReadAfterWriteGTID implements the SessionActions interface
func (vc *vcursorImpl) SetDDLStrategy(strategy sqlparser.DDLStrategy) {
vc.safeSession.SetDDLStrategy(strategy)
}

// SetReadAfterWriteGTID implements the SessionActions interface
func (vc *vcursorImpl) SetReadAfterWriteGTID(vtgtid string) {
vc.safeSession.SetReadAfterWriteGTID(vtgtid)
Expand Down
3 changes: 3 additions & 0 deletions proto/vtgate.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ message Session {

// read_after_write tracks the ReadAfterWrite settings for this session.
ReadAfterWrite read_after_write = 20;

// DDL strategy
string DDLStrategy = 21;
}

// ReadAfterWrite contains information regarding gtid set and timeout
Expand Down