Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl : support alter table xx cache operations #29022

Merged
merged 24 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5881,6 +5881,34 @@ func (s *testDBSuite2) TestTableLocksLostCommit(c *C) {
tk.MustExec("unlock tables")
}

// test alter table cache
func (s *testDBSuite2) TestAlterTableCache(c *C) {
JayLZhou marked this conversation as resolved.
Show resolved Hide resolved
tk := testkit.NewTestKit(c, s.store)
tk2 := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk2.MustExec("use test")
/* Test of cache table */
tk.MustExec("create table t1 ( n int auto_increment primary key)")
tk.MustExec("alter table t1 cache")
checkTableCache(c, tk.Se, "test", "t1")
tk.MustExec("drop table if exists t1")
/*Test can't skip schema checker*/
tk.MustExec("drop table if exists t1,t2")
tk.MustExec("CREATE TABLE t1 (a int)")
tk.MustExec("CREATE TABLE t2 (a int)")
tk.MustExec("begin")
tk.MustExec("insert into t1 set a=1;")
tk2.MustExec("alter table t1 cache;")
_, err := tk.Exec("commit")
c.Assert(terror.ErrorEqual(domain.ErrInfoSchemaChanged, err), IsTrue)
///*Test can skip schema checker */
tk.MustExec("begin")
tk.MustExec("insert into t1 set a=2;")
tk2.MustExec("alter table t2 cache")
tk.MustExec("commit")
}

// test write local lock
func (s *testDBSuite2) TestWriteLocal(c *C) {
tk := testkit.NewTestKit(c, s.store)
Expand Down Expand Up @@ -6414,7 +6442,13 @@ func checkTableLock(c *C, se session.Session, dbName, tableName string, lockTp m
c.Assert(tb.Meta().Lock, IsNil)
}
}

func checkTableCache(c *C, se session.Session, dbName, tableName string) {
tb := testGetTableByName(c, se, dbName, tableName)
dom := domain.GetDomain(se)
err := dom.Reload()
c.Assert(err, IsNil)
c.Assert(tb.Meta().TableCacheStatusType, Equals, model.TableCacheStatusENABLE)
}
func (s *testDBSuite2) TestDDLWithInvalidTableInfo(c *C) {
tk := testkit.NewTestKit(c, s.store)

Expand Down
19 changes: 19 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2819,6 +2819,8 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast
err = d.AlterTablePartitionAttributes(sctx, ident, spec)
case ast.AlterTablePartitionOptions:
err = d.AlterTablePartitionOptions(sctx, ident, spec)
case ast.AlterTableCache:
err = d.AlterTableCache(sctx, ident)
default:
// Nothing to do now.
}
Expand Down Expand Up @@ -6587,3 +6589,20 @@ func (d *ddl) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacem
err = d.callHookOnChanged(err)
return errors.Trace(err)
}
func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error) {
schema, t, err := d.getSchemaAndTableByIdent(ctx, ti)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if it is already cached. If it is, we don't even need to add a job.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is checking the type of the table (is it a view) one of the feature compatibilities you mentioned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i check if is a already cached in onAlterTableCache function. if needed i can change its position

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically, we check it twice. One before creating the job and another in executing the job.
Also, please notice the other question I asked above.

if err != nil {
return err
}
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about SchemaName?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...i not use it. so i am not transfer it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use it in some places, such as error reporting and show DDL jobs. It's better to follow the convention.

Type: model.ActionAlterCacheTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{},
}

err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}
2 changes: 2 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onAlterPlacementPolicy(d, t, job)
case model.ActionAlterTablePartitionPolicy:
ver, err = onAlterTablePartitionOptions(t, job)
case model.ActionAlterCacheTable:
ver, err = onAlterCacheTable(t, job)
default:
// Invalid job, cancel it.
job.State = model.JobStateCancelled
Expand Down
34 changes: 34 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,3 +1421,37 @@ func updateLabelRules(job *model.Job, tblInfo *model.TableInfo, oldRules map[str
patch := label.NewRulePatch(newRules, oldRuleIDs)
return infosync.UpdateLabelRules(context.TODO(), patch)
}
func onAlterCacheTable(t *meta.Meta, job *model.Job) (ver int64, err error) {
tbInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return 0, errors.Trace(err)
}
// If the table is already in the cache state
if tbInfo.TableCacheStatusType == model.TableCacheStatusENABLE {
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
return ver, nil
}

switch tbInfo.TableCacheStatusType {
case model.TableCacheStatusDISABLE:
// disable -> switching
tbInfo.TableCacheStatusType = model.TableCacheStatusSWITCHING
ver, err = updateVersionAndTableInfoWithCheck(t, job, tbInfo, true)
if err != nil {
return ver, err
}
case model.TableCacheStatusSWITCHING:
// switching -> enable
tbInfo.TableCacheStatusType = model.TableCacheStatusENABLE
ver, err = updateVersionAndTableInfoWithCheck(t, job, tbInfo, true)
if err != nil {
return ver, err
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
default:
job.State = model.JobStateCancelled
err = ErrInvalidDDLState.GenWithStackByArgs("alter table cache", tbInfo.TableCacheStatusType.String())
}
return ver, err
}
35 changes: 35 additions & 0 deletions ddl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,41 @@ func (s *testTableSuite) TestTable(c *C) {
testCheckTableState(c, d, dbInfo1, tblInfo, model.StatePublic)
testCheckJobDone(c, d, job, true)
checkTableLockedTest(c, d, dbInfo1, tblInfo, d.GetID(), ctx.GetSessionVars().ConnectionID, model.TableLockWrite)
// for alter cache table
job = testAlterCacheTable(c, ctx, d, dbInfo1.ID, tblInfo)
testCheckTableState(c, d, dbInfo1, tblInfo, model.StatePublic)
testCheckJobDone(c, d, job, true)
checkTableCacheTest(c, d, dbInfo1, tblInfo)
}

func checkTableCacheTest(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) {
err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
info, err := t.GetTable(dbInfo.ID, tblInfo.ID)
c.Assert(err, IsNil)
c.Assert(info, NotNil)
c.Assert(info.TableCacheStatusType, NotNil)
c.Assert(info.TableCacheStatusType, Equals, model.TableCacheStatusENABLE)
return nil
})
c.Assert(err, IsNil)
}

func testAlterCacheTable(c *C, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo) *model.Job {

job := &model.Job{
SchemaID: newSchemaID,
TableID: tblInfo.ID,
Type: model.ActionAlterCacheTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{},
}
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)

v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v})
return job
}

// for drop indexes
Expand Down
4 changes: 4 additions & 0 deletions parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ const (
ActionDropPlacementPolicy ActionType = 53
ActionAlterTablePartitionPolicy ActionType = 54
ActionModifySchemaDefaultPlacement ActionType = 55
ActionAlterCacheTable ActionType = 56
ActionAlterNoCacheTable ActionType = 57
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
)

var actionMap = map[ActionType]string{
Expand Down Expand Up @@ -142,6 +144,8 @@ var actionMap = map[ActionType]string{
ActionAlterPlacementPolicy: "alter placement policy",
ActionDropPlacementPolicy: "drop placement policy",
ActionModifySchemaDefaultPlacement: "modify schema default placement",
ActionAlterCacheTable: "alter cache table",
ActionAlterNoCacheTable: "alter nocache table",
}

// String return current ddl action in string
Expand Down
26 changes: 23 additions & 3 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,30 @@ type TableInfo struct {
// It's true when the engine of the table is TiFlash only.
IsColumnar bool `json:"is_columnar"`

TempTableType `json:"temp_table_type"`
TempTableType `json:"temp_table_type"`
TableCacheStatusType `json:"cache_table_status"`
PlacementPolicyRef *PolicyRefInfo `json:"policy_ref_info"`
DirectPlacementOpts *PlacementSettings `json:"placement_settings"`
}
type TableCacheStatusType int

PlacementPolicyRef *PolicyRefInfo `json:"policy_ref_info"`
DirectPlacementOpts *PlacementSettings `json:"placement_settings"`
const (
TableCacheStatusDISABLE TableCacheStatusType = iota
TableCacheStatusENABLE
TableCacheStatusSWITCHING
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
)

func (t TableCacheStatusType) String() string {
switch t {
case TableCacheStatusDISABLE:
return "DISABLE"
case TableCacheStatusENABLE:
return "ENABLE"
case TableCacheStatusSWITCHING:
return "SWITCHING"
default:
return ""
}
}

type TempTableType byte
Expand Down