Skip to content

Commit

Permalink
ddl: init global vars from system tables before start domain (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored and hawkingrei committed Aug 1, 2024
1 parent 4a723af commit df0a776
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 74 deletions.
81 changes: 49 additions & 32 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ type Domain struct {
// TODO: use Run for each process in future pr
wg *util.WaitGroupEnhancedWrapper
statsUpdating atomicutil.Int32
cancelFns struct {
// this is the parent context of DDL, and also used by other loops such as closestReplicaReadCheckLoop.
// there are other top level contexts in the domain, such as the ones used in
// InitDistTaskLoop and loadStatsWorker, domain only stores the cancelFns of them.
// TODO unify top level context.
ctx context.Context
cancelFns struct {
mu sync.Mutex
fns []context.CancelFunc
}
Expand Down Expand Up @@ -1261,12 +1266,19 @@ func newEtcdCli(addrs []string, ebd kv.EtcdBackend) (*clientv3.Client, error) {
return cli, err
}

// Init initializes a domain.
// Init initializes a domain. after return, session can be used to do DMLs but not
// DDLs which can be used after domain Start.
func (do *Domain) Init(
ddlLease time.Duration,
sysExecutorFactory func(*Domain) (pools.Resource, error),
ddlInjector func(ddl.DDL, ddl.Executor, *infoschema.InfoCache) *schematracker.Checker,
) error {
// TODO there are many place set ddlLease to 0, remove them completely, we want
// UT and even local uni-store to run similar code path as normal.
if ddlLease == 0 {
ddlLease = time.Second
}

do.sysExecutorFactory = sysExecutorFactory
perfschema.Init()
if ebd, ok := do.store.(kv.EtcdBackend); ok {
Expand Down Expand Up @@ -1295,18 +1307,8 @@ func (do *Domain) Init(
}
}

// TODO: Here we create new sessions with sysFac in DDL,
// which will use `do` as Domain instead of call `domap.Get`.
// That's because `domap.Get` requires a lock, but before
// we initialize Domain finish, we can't require that again.
// After we remove the lazy logic of creating Domain, we
// can simplify code here.
sysFac := func() (pools.Resource, error) {
return sysExecutorFactory(do)
}
sysCtxPool := pools.NewResourcePool(sysFac, 512, 512, resourceIdleTimeout)

ctx, cancelFunc := context.WithCancel(context.Background())
do.ctx = ctx
do.cancelFns.mu.Lock()
do.cancelFns.fns = append(do.cancelFns.fns, cancelFunc)
do.cancelFns.mu.Unlock()
Expand Down Expand Up @@ -1372,9 +1374,6 @@ func (do *Domain) Init(
}
do.isLostConnectionToPD.Store(0)
}

do.wg.Add(1)
go do.serverIDKeeper()
} else {
// set serverID for standalone deployment to enable 'KILL'.
atomic.StoreUint64(&do.serverID, serverIDForStandalone)
Expand All @@ -1396,19 +1395,6 @@ func (do *Domain) Init(
return err
}

// step 4: start the ddl after the domain reload, avoiding some internal sql running before infoSchema construction.
err = do.ddl.Start(sysCtxPool)
if err != nil {
return err
}
do.minJobIDRefresher = do.ddl.GetMinJobIDRefresher()

// TODO there are many place set ddlLease to 0, remove them completely, we want
// UT and even local uni-store to run similar code path as normal.
if ddlLease == 0 {
ddlLease = time.Second
}

sub := time.Since(startReloadTime)
// The reload(in step 2) operation takes more than ddlLease and a new reload operation was not performed,
// the next query will respond by ErrInfoSchemaExpired error. So we do a new reload to update schemaValidator.latestSchemaExpire.
Expand All @@ -1419,27 +1405,58 @@ func (do *Domain) Init(
return err
}
}
return nil
}

// Start starts the domain. After start, DDLs can be executed using session, see
// Init also.
func (do *Domain) Start() error {
gCfg := config.GetGlobalConfig()
if gCfg.EnableGlobalKill && do.etcdClient != nil {
do.wg.Add(1)
go do.serverIDKeeper()
}

// TODO: Here we create new sessions with sysFac in DDL,
// which will use `do` as Domain instead of call `domap.Get`.
// That's because `domap.Get` requires a lock, but before
// we initialize Domain finish, we can't require that again.
// After we remove the lazy logic of creating Domain, we
// can simplify code here.
sysFac := func() (pools.Resource, error) {
return do.sysExecutorFactory(do)
}
sysCtxPool := pools.NewResourcePool(sysFac, 512, 512, resourceIdleTimeout)

// start the ddl after the domain reload, avoiding some internal sql running before infoSchema construction.
err := do.ddl.Start(sysCtxPool)
if err != nil {
return err
}
do.minJobIDRefresher = do.ddl.GetMinJobIDRefresher()

// Local store needs to get the change information for every DDL state in each session.
do.wg.Run(func() {
do.loadSchemaInLoop(ctx, ddlLease)
do.loadSchemaInLoop(do.ctx, do.ddl.GetLease())
}, "loadSchemaInLoop")
do.wg.Run(do.mdlCheckLoop, "mdlCheckLoop")
do.wg.Run(do.topNSlowQueryLoop, "topNSlowQueryLoop")
do.wg.Run(do.infoSyncerKeeper, "infoSyncerKeeper")
do.wg.Run(do.globalConfigSyncerKeeper, "globalConfigSyncerKeeper")
do.wg.Run(do.runawayStartLoop, "runawayStartLoop")
do.wg.Run(do.requestUnitsWriterLoop, "requestUnitsWriterLoop")
skipRegisterToDashboard := gCfg.SkipRegisterToDashboard
if !skipRegisterToDashboard {
do.wg.Run(do.topologySyncerKeeper, "topologySyncerKeeper")
}
pdCli := do.GetPDClient()
if pdCli != nil {
do.wg.Run(func() {
do.closestReplicaReadCheckLoop(ctx, pdCli)
do.closestReplicaReadCheckLoop(do.ctx, pdCli)
}, "closestReplicaReadCheckLoop")
}

err = do.initLogBackup(ctx, pdCli)
err = do.initLogBackup(do.ctx, pdCli)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func TestInfo(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/MockReplaceDDL", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/NoDDLDispatchLoop", `return(true)`))
require.NoError(t, dom.Init(ddlLease, sysMockFactory, nil))
require.NoError(t, dom.Start())
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/NoDDLDispatchLoop"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/MockReplaceDDL"))

Expand Down
2 changes: 1 addition & 1 deletion pkg/expression/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,6 @@ func AssertLocationWithSessionVars(ctxLoc *time.Location, vars *variable.Session
stmtLocStr := vars.StmtCtx.TimeZone().String()
intest.Assert(ctxLocStr == varsLocStr && ctxLocStr == stmtLocStr,
"location mismatch, ctxLoc: %s, varsLoc: %s, stmtLoc: %s",
ctxLoc.String(), ctxLocStr, stmtLocStr,
ctxLocStr, varsLocStr, stmtLocStr,
)
}
Loading

0 comments on commit df0a776

Please sign in to comment.