Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
Howie59 committed Apr 28, 2021
2 parents 9f13862 + a1d4d9d commit 6a86dcd
Show file tree
Hide file tree
Showing 90 changed files with 1,538 additions and 806 deletions.
124 changes: 123 additions & 1 deletion ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@ package ddl

import (
"context"
"fmt"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// Interceptor is used for DDL.
Expand All @@ -39,7 +46,7 @@ func (bi *BaseInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema
type Callback interface {
// OnChanged is called after a ddl statement is finished.
OnChanged(err error) error
// OnSchemaStateChange is called after a schema state is changed.
// OnSchemaStateChanged is called after a schema state is changed.
OnSchemaStateChanged()
// OnJobRunBefore is called before running job.
OnJobRunBefore(job *model.Job)
Expand Down Expand Up @@ -77,3 +84,118 @@ func (c *BaseCallback) OnJobUpdated(job *model.Job) {
func (c *BaseCallback) OnWatched(ctx context.Context) {
// Nothing to do.
}

// DomainReloader is used to avoid import loop.
type DomainReloader interface {
Reload() error
}

// ****************************** Start of Customized DDL Callback Instance ****************************************

// DefaultCallback is the default callback that TiDB will use.
type DefaultCallback struct {
*BaseCallback
do DomainReloader
}

// OnChanged overrides ddl Callback interface.
func (c *DefaultCallback) OnChanged(err error) error {
if err != nil {
return err
}
logutil.BgLogger().Info("performing DDL change, must reload")

err = c.do.Reload()
if err != nil {
logutil.BgLogger().Error("performing DDL change failed", zap.Error(err))
}

return nil
}

// OnSchemaStateChanged overrides the ddl Callback interface.
func (c *DefaultCallback) OnSchemaStateChanged() {
err := c.do.Reload()
if err != nil {
logutil.BgLogger().Error("domain callback failed on schema state changed", zap.Error(err))
}
}

func newDefaultCallBack(do DomainReloader) Callback {
return &DefaultCallback{do: do}
}

// ****************************** End of Default DDL Callback Instance *********************************************

// ****************************** Start of CTC DDL Callback Instance ***********************************************

// ctcCallback is the customized callback that TiDB will use.
// ctc is named from column type change, here after we call them ctc for short.
type ctcCallback struct {
*BaseCallback
do DomainReloader
}

// OnChanged overrides ddl Callback interface.
func (c *ctcCallback) OnChanged(err error) error {
if err != nil {
return err
}
logutil.BgLogger().Info("performing DDL change, must reload")

err = c.do.Reload()
if err != nil {
logutil.BgLogger().Error("performing DDL change failed", zap.Error(err))
}
return nil
}

// OnSchemaStateChanged overrides the ddl Callback interface.
func (c *ctcCallback) OnSchemaStateChanged() {
err := c.do.Reload()
if err != nil {
logutil.BgLogger().Error("domain callback failed on schema state changed", zap.Error(err))
}
}

// OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first.
func (c *ctcCallback) OnJobRunBefore(job *model.Job) {
log.Info("on job run before", zap.String("job", job.String()))
// Only block the ctc type ddl here.
if job.Type != model.ActionModifyColumn {
return
}
switch job.SchemaState {
case model.StateDeleteOnly, model.StateWriteOnly, model.StateWriteReorganization:
logutil.BgLogger().Warn(fmt.Sprintf("[DDL_HOOK] Hang for 0.5 seconds on %s state triggered", job.SchemaState.String()))
time.Sleep(500 * time.Millisecond)
}
}

func newCTCCallBack(do DomainReloader) Callback {
return &ctcCallback{do: do}
}

// ****************************** End of CTC DDL Callback Instance ***************************************************

var (
customizedCallBackRegisterMap = map[string]func(do DomainReloader) Callback{}
)

func init() {
// init the callback register map.
customizedCallBackRegisterMap["default_hook"] = newDefaultCallBack
customizedCallBackRegisterMap["ctc_hook"] = newCTCCallBack
}

// GetCustomizedHook get the hook registered in the hookMap.
func GetCustomizedHook(s string) (func(do DomainReloader) Callback, error) {
s = strings.ToLower(s)
s = strings.TrimSpace(s)
fact, ok := customizedCallBackRegisterMap[s]
if !ok {
logutil.BgLogger().Error("bad ddl hook " + s)
return nil, errors.Errorf("ddl hook `%s` is not found in hook registered map", s)
}
return fact, nil
}
32 changes: 24 additions & 8 deletions ddl/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

Expand All @@ -38,6 +38,7 @@ func (ti *TestInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema
return ti.BaseInterceptor.OnGetInfoSchema(ctx, is)
}

// TestDDLCallback is used to customize user callback themselves.
type TestDDLCallback struct {
*BaseCallback
// We recommended to pass the domain parameter to the test ddl callback, it will ensure
Expand All @@ -51,16 +52,33 @@ type TestDDLCallback struct {
onWatched func(ctx context.Context)
}

// OnChanged mock the same behavior with the main DDL hook.
func (tc *TestDDLCallback) OnChanged(err error) error {
if err != nil {
return err
}
logutil.BgLogger().Info("performing DDL change, must reload")
if tc.Do != nil {
err = tc.Do.Reload()
if err != nil {
logutil.BgLogger().Error("performing DDL change failed", zap.Error(err))
}
}
return nil
}

// OnSchemaStateChanged mock the same behavior with the main ddl hook.
func (tc *TestDDLCallback) OnSchemaStateChanged() {
if tc.Do != nil {
if err := tc.Do.Reload(); err != nil {
log.Warn("reload failed on schema state changed", zap.Error(err))
logutil.BgLogger().Warn("reload failed on schema state changed", zap.Error(err))
}
}
}

// OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first.
func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
log.Info("on job run before", zap.String("job", job.String()))
logutil.BgLogger().Info("on job run before", zap.String("job", job.String()))
if tc.OnJobRunBeforeExported != nil {
tc.OnJobRunBeforeExported(job)
return
Expand All @@ -73,8 +91,9 @@ func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
tc.BaseCallback.OnJobRunBefore(job)
}

// OnJobUpdated is used to run the user customized logic of `OnJobUpdated` first.
func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) {
log.Info("on job updated", zap.String("job", job.String()))
logutil.BgLogger().Info("on job updated", zap.String("job", job.String()))
if tc.OnJobUpdatedExported != nil {
tc.OnJobUpdatedExported(job)
return
Expand All @@ -87,6 +106,7 @@ func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) {
tc.BaseCallback.OnJobUpdated(job)
}

// OnWatched is used to run the user customized logic of `OnWatched` first.
func (tc *TestDDLCallback) OnWatched(ctx context.Context) {
if tc.onWatched != nil {
tc.onWatched(ctx)
Expand All @@ -103,7 +123,3 @@ func (s *testDDLSuite) TestCallback(c *C) {
cb.OnJobUpdated(nil)
cb.OnWatched(context.TODO())
}

type DomainReloader interface {
Reload() error
}
12 changes: 11 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,14 @@ type DDL interface {
OwnerManager() owner.Manager
// GetID gets the ddl ID.
GetID() string
// GetTableMaxRowID gets the max row ID of a normal table or a partition.
// GetTableMaxHandle gets the max row ID of a normal table or a partition.
GetTableMaxHandle(startTS uint64, tbl table.PhysicalTable) (kv.Handle, bool, error)
// SetBinlogClient sets the binlog client for DDL worker. It's exported for testing.
SetBinlogClient(*pumpcli.PumpsClient)
// GetHook gets the hook. It's exported for testing.
GetHook() Callback
// SetHook sets the hook.
SetHook(h Callback)
}

type limitJobTask struct {
Expand Down Expand Up @@ -625,6 +627,14 @@ func (d *ddl) GetHook() Callback {
return d.mu.hook
}

// SetHook set the customized hook.
func (d *ddl) SetHook(h Callback) {
d.mu.Lock()
defer d.mu.Unlock()

d.mu.hook = h
}

func (d *ddl) startCleanDeadTableLock() {
defer func() {
goutil.Recover(metrics.LabelDDL, "startCleanDeadTableLock", nil, false)
Expand Down
8 changes: 0 additions & 8 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,6 @@ type DDLForTest interface {
SetInterceptor(h Interceptor)
}

// SetHook implements DDL.SetHook interface.
func (d *ddl) SetHook(h Callback) {
d.mu.Lock()
defer d.mu.Unlock()

d.mu.hook = h
}

// SetInterceptor implements DDL.SetInterceptor interface.
func (d *ddl) SetInterceptor(i Interceptor) {
d.mu.Lock()
Expand Down
4 changes: 2 additions & 2 deletions ddl/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -138,7 +138,7 @@ func waitScatterRegionFinish(ctx context.Context, store kv.SplittableStore, regi
if err != nil {
logutil.BgLogger().Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
// We don't break for PDError because it may caused by ScatterRegion request failed.
if _, ok := errors.Cause(err).(*tikvstore.PDError); !ok {
if _, ok := errors.Cause(err).(*tikverr.PDError); !ok {
break
}
}
Expand Down
1 change: 1 addition & 0 deletions docs/design/2020-07-07-change-column-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ This requires changing the state of `colA` and `idxA` to `StatePublic`, and then
### Compatibility issues with MySQL

* Considering the complexity of supporting clustered-index and currently TiDB does not fully support clustered-index, so temporarily this feature does not support type modification for columns with the primary key.
* In addition, we temporarily do not support column type change on three features of partition table, generated column, and expression index in the first stage of implementation.

### Compatibility issues with TiDB

Expand Down
37 changes: 7 additions & 30 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,34 +648,6 @@ func (do *Domain) Close() {
logutil.BgLogger().Info("domain closed", zap.Duration("take time", time.Since(startTime)))
}

type ddlCallback struct {
ddl.BaseCallback
do *Domain
}

// OnChanged overrides ddl Callback interface.
func (c *ddlCallback) OnChanged(err error) error {
if err != nil {
return err
}
logutil.BgLogger().Info("performing DDL change, must reload")

err = c.do.Reload()
if err != nil {
logutil.BgLogger().Error("performing DDL change failed", zap.Error(err))
}

return nil
}

// OnSchemaStateChange overrides the ddl Callback interface.
func (c *ddlCallback) OnSchemaStateChanged() {
err := c.do.Reload()
if err != nil {
logutil.BgLogger().Error("domain callback failed on schema state changed", zap.Error(err))
}
}

const resourceIdleTimeout = 3 * time.Minute // resources in the ResourcePool will be recycled after idleTimeout

// NewDomain creates a new domain. Should not create multiple domains for the same store.
Expand Down Expand Up @@ -746,7 +718,12 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
sysCtxPool := pools.NewResourcePool(sysFac, 2, 2, resourceIdleTimeout)
ctx, cancelFunc := context.WithCancel(context.Background())
do.cancel = cancelFunc
callback := &ddlCallback{do: do}
var callback ddl.Callback
newCallbackFunc, err := ddl.GetCustomizedHook("default_hook")
if err != nil {
return errors.Trace(err)
}
callback = newCallbackFunc(do)
d := do.ddl
do.ddl = ddl.NewDDL(
ctx,
Expand All @@ -756,7 +733,7 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
ddl.WithHook(callback),
ddl.WithLease(ddlLease),
)
err := do.ddl.Start(sysCtxPool)
err = do.ddl.Start(sysCtxPool)
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/types"
util2 "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/hack"
Expand Down Expand Up @@ -252,7 +253,9 @@ func UpdateTiFlashTableSyncProgress(ctx context.Context, tid int64, progress flo
return nil
}
key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, tid)
return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, key, strconv.FormatFloat(progress, 'f', 2, 64))
// truncate progress with 2 decimal digits so that it will not be rounded to 1 when the progress is 0.995
progressString := types.TruncateFloatToString(progress, 2)
return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, key, progressString)
}

// DeleteTiFlashTableSyncProgress is used to delete the tiflash table replica sync progress.
Expand Down
3 changes: 2 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
tikverr "github.com/pingcap/tidb/store/tikv/error"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -637,7 +638,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
}
txnCtx := sessVars.TxnCtx
var newForUpdateTS uint64
if deadlock, ok := errors.Cause(err).(*tikvstore.ErrDeadlock); ok {
if deadlock, ok := errors.Cause(err).(*tikverr.ErrDeadlock); ok {
if !deadlock.IsRetryable {
return nil, ErrDeadlock
}
Expand Down
Loading

0 comments on commit 6a86dcd

Please sign in to comment.