diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 49866feb5b0fe..e78bad22a19c3 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -166,7 +166,12 @@ type Domain struct { // TODO: use Run for each process in future pr wg *util.WaitGroupEnhancedWrapper statsUpdating atomicutil.Int32 - cancelFns struct { + // this is the parent context of DDL, and also used by other loops such as closestReplicaReadCheckLoop. + // there are other top level contexts in the domain, such as the ones used in + // InitDistTaskLoop and loadStatsWorker, domain only stores the cancelFns of them. + // TODO unify top level context. + ctx context.Context + cancelFns struct { mu sync.Mutex fns []context.CancelFunc } @@ -1260,12 +1265,19 @@ func newEtcdCli(addrs []string, ebd kv.EtcdBackend) (*clientv3.Client, error) { return cli, err } -// Init initializes a domain. +// Init initializes a domain. after return, session can be used to do DMLs but not +// DDLs which can be used after domain Start. func (do *Domain) Init( ddlLease time.Duration, sysExecutorFactory func(*Domain) (pools.Resource, error), ddlInjector func(ddl.DDL, ddl.Executor, *infoschema.InfoCache) *schematracker.Checker, ) error { + // TODO there are many place set ddlLease to 0, remove them completely, we want + // UT and even local uni-store to run similar code path as normal. + if ddlLease == 0 { + ddlLease = time.Second + } + do.sysExecutorFactory = sysExecutorFactory perfschema.Init() if ebd, ok := do.store.(kv.EtcdBackend); ok { @@ -1294,18 +1306,8 @@ func (do *Domain) Init( } } - // TODO: Here we create new sessions with sysFac in DDL, - // which will use `do` as Domain instead of call `domap.Get`. - // That's because `domap.Get` requires a lock, but before - // we initialize Domain finish, we can't require that again. - // After we remove the lazy logic of creating Domain, we - // can simplify code here. - sysFac := func() (pools.Resource, error) { - return sysExecutorFactory(do) - } - sysCtxPool := pools.NewResourcePool(sysFac, 512, 512, resourceIdleTimeout) - ctx, cancelFunc := context.WithCancel(context.Background()) + do.ctx = ctx do.cancelFns.mu.Lock() do.cancelFns.fns = append(do.cancelFns.fns, cancelFunc) do.cancelFns.mu.Unlock() @@ -1371,9 +1373,6 @@ func (do *Domain) Init( } do.isLostConnectionToPD.Store(0) } - - do.wg.Add(1) - go do.serverIDKeeper() } else { // set serverID for standalone deployment to enable 'KILL'. atomic.StoreUint64(&do.serverID, serverIDForStandalone) @@ -1395,19 +1394,6 @@ func (do *Domain) Init( return err } - // step 4: start the ddl after the domain reload, avoiding some internal sql running before infoSchema construction. - err = do.ddl.Start(sysCtxPool) - if err != nil { - return err - } - do.minJobIDRefresher = do.ddl.GetMinJobIDRefresher() - - // TODO there are many place set ddlLease to 0, remove them completely, we want - // UT and even local uni-store to run similar code path as normal. - if ddlLease == 0 { - ddlLease = time.Second - } - sub := time.Since(startReloadTime) // The reload(in step 2) operation takes more than ddlLease and a new reload operation was not performed, // the next query will respond by ErrInfoSchemaExpired error. So we do a new reload to update schemaValidator.latestSchemaExpire. @@ -1418,10 +1404,39 @@ func (do *Domain) Init( return err } } + return nil +} + +// Start starts the domain. After start, DDLs can be executed using session, see +// Init also. +func (do *Domain) Start() error { + gCfg := config.GetGlobalConfig() + if gCfg.EnableGlobalKill && do.etcdClient != nil { + do.wg.Add(1) + go do.serverIDKeeper() + } + + // TODO: Here we create new sessions with sysFac in DDL, + // which will use `do` as Domain instead of call `domap.Get`. + // That's because `domap.Get` requires a lock, but before + // we initialize Domain finish, we can't require that again. + // After we remove the lazy logic of creating Domain, we + // can simplify code here. + sysFac := func() (pools.Resource, error) { + return do.sysExecutorFactory(do) + } + sysCtxPool := pools.NewResourcePool(sysFac, 512, 512, resourceIdleTimeout) + + // start the ddl after the domain reload, avoiding some internal sql running before infoSchema construction. + err := do.ddl.Start(sysCtxPool) + if err != nil { + return err + } + do.minJobIDRefresher = do.ddl.GetMinJobIDRefresher() // Local store needs to get the change information for every DDL state in each session. do.wg.Run(func() { - do.loadSchemaInLoop(ctx, ddlLease) + do.loadSchemaInLoop(do.ctx, do.ddl.GetLease()) }, "loadSchemaInLoop") do.wg.Run(do.mdlCheckLoop, "mdlCheckLoop") do.wg.Run(do.topNSlowQueryLoop, "topNSlowQueryLoop") @@ -1429,16 +1444,18 @@ func (do *Domain) Init( do.wg.Run(do.globalConfigSyncerKeeper, "globalConfigSyncerKeeper") do.wg.Run(do.runawayStartLoop, "runawayStartLoop") do.wg.Run(do.requestUnitsWriterLoop, "requestUnitsWriterLoop") + skipRegisterToDashboard := gCfg.SkipRegisterToDashboard if !skipRegisterToDashboard { do.wg.Run(do.topologySyncerKeeper, "topologySyncerKeeper") } + pdCli := do.GetPDClient() if pdCli != nil { do.wg.Run(func() { - do.closestReplicaReadCheckLoop(ctx, pdCli) + do.closestReplicaReadCheckLoop(do.ctx, pdCli) }, "closestReplicaReadCheckLoop") } - err = do.initLogBackup(ctx, pdCli) + err = do.initLogBackup(do.ctx, pdCli) if err != nil { return err } diff --git a/pkg/domain/domain_test.go b/pkg/domain/domain_test.go index bd064e26f34a1..543f485743510 100644 --- a/pkg/domain/domain_test.go +++ b/pkg/domain/domain_test.go @@ -94,6 +94,7 @@ func TestInfo(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/MockReplaceDDL", `return(true)`)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/NoDDLDispatchLoop", `return(true)`)) require.NoError(t, dom.Init(ddlLease, sysMockFactory, nil)) + require.NoError(t, dom.Start()) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/NoDDLDispatchLoop")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/MockReplaceDDL")) diff --git a/pkg/expression/context/context.go b/pkg/expression/context/context.go index 8079d4205c40a..7986807118270 100644 --- a/pkg/expression/context/context.go +++ b/pkg/expression/context/context.go @@ -231,6 +231,6 @@ func AssertLocationWithSessionVars(ctxLoc *time.Location, vars *variable.Session stmtLocStr := vars.StmtCtx.TimeZone().String() intest.Assert(ctxLocStr == varsLocStr && ctxLocStr == stmtLocStr, "location mismatch, ctxLoc: %s, varsLoc: %s, stmtLoc: %s", - ctxLoc.String(), ctxLocStr, stmtLocStr, + ctxLocStr, varsLocStr, stmtLocStr, ) } diff --git a/pkg/session/bootstrap_test.go b/pkg/session/bootstrap_test.go index 18d73f7928b25..0987adb382ddd 100644 --- a/pkg/session/bootstrap_test.go +++ b/pkg/session/bootstrap_test.go @@ -170,6 +170,7 @@ func TestBootstrapWithError(t *testing.T) { require.NoError(t, err) dom, err := domap.Get(store) require.NoError(t, err) + require.NoError(t, dom.Start()) domain.BindDomain(se, dom) b, err := checkBootstrapped(se) require.False(t, b) @@ -497,7 +498,6 @@ func TestANSISQLMode(t *testing.T) { // Do some clean up, BootstrapSession will not create a new domain otherwise. dom.Close() - domap.Delete(store) // Set ANSI sql_mode and bootstrap again, to cover a bugfix. // Once we have a SQL like that: @@ -1106,7 +1106,7 @@ func TestTiDBCostModelInNewCluster(t *testing.T) { func TestTiDBCostModelUpgradeFrom300To650(t *testing.T) { ctx := context.Background() - store, _ := CreateStoreAndBootstrap(t) + store, dom := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() // Upgrade from 3.0.0 to 6.5+. @@ -1134,6 +1134,7 @@ func TestTiDBCostModelUpgradeFrom300To650(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, chk.NumRows()) + dom.Close() domCurVer, err := BootstrapSession(store) require.NoError(t, err) defer domCurVer.Close() @@ -1225,7 +1226,7 @@ func TestTiDBCostModelUpgradeFrom610To650(t *testing.T) { func TestTiDBGCAwareUpgradeFrom630To650(t *testing.T) { ctx := context.Background() - store, _ := CreateStoreAndBootstrap(t) + store, dom := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() // upgrade from 6.3 to 6.5+. @@ -1257,6 +1258,7 @@ func TestTiDBGCAwareUpgradeFrom630To650(t *testing.T) { require.Equal(t, "1", row.GetString(1)) // Upgrade to 6.5. + dom.Close() domCurVer, err := BootstrapSession(store) require.NoError(t, err) defer domCurVer.Close() @@ -1278,7 +1280,7 @@ func TestTiDBGCAwareUpgradeFrom630To650(t *testing.T) { func TestTiDBServerMemoryLimitUpgradeTo651_1(t *testing.T) { ctx := context.Background() - store, _ := CreateStoreAndBootstrap(t) + store, dom := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() // upgrade from 6.5.0 to 6.5.1+. @@ -1310,6 +1312,7 @@ func TestTiDBServerMemoryLimitUpgradeTo651_1(t *testing.T) { require.Equal(t, "0", row.GetString(1)) // Upgrade to 6.5.1+. + dom.Close() domCurVer, err := BootstrapSession(store) require.NoError(t, err) defer domCurVer.Close() @@ -1331,7 +1334,7 @@ func TestTiDBServerMemoryLimitUpgradeTo651_1(t *testing.T) { func TestTiDBServerMemoryLimitUpgradeTo651_2(t *testing.T) { ctx := context.Background() - store, _ := CreateStoreAndBootstrap(t) + store, dom := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() // upgrade from 6.5.0 to 6.5.1+. @@ -1363,6 +1366,7 @@ func TestTiDBServerMemoryLimitUpgradeTo651_2(t *testing.T) { require.Equal(t, "70%", row.GetString(1)) // Upgrade to 6.5.1+. + dom.Close() domCurVer, err := BootstrapSession(store) require.NoError(t, err) defer domCurVer.Close() @@ -1384,7 +1388,7 @@ func TestTiDBServerMemoryLimitUpgradeTo651_2(t *testing.T) { func TestTiDBGlobalVariablesDefaultValueUpgradeFrom630To660(t *testing.T) { ctx := context.Background() - store, _ := CreateStoreAndBootstrap(t) + store, dom := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() // upgrade from 6.3.0 to 6.6.0. @@ -1423,6 +1427,7 @@ func TestTiDBGlobalVariablesDefaultValueUpgradeFrom630To660(t *testing.T) { } // Upgrade to 6.6.0. + dom.Close() domCurVer, err := BootstrapSession(store) require.NoError(t, err) defer domCurVer.Close() @@ -1516,7 +1521,7 @@ func TestTiDBStoreBatchSizeUpgradeFrom650To660(t *testing.T) { } func TestTiDBUpgradeToVer136(t *testing.T) { - store, _ := CreateStoreAndBootstrap(t) + store, do := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() @@ -1543,6 +1548,7 @@ func TestTiDBUpgradeToVer136(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/reorgMetaRecordFastReorgDisabled")) }) MustExec(t, seV135, "set global tidb_ddl_enable_fast_reorg = 1") + do.Close() dom, err := BootstrapSession(store) require.NoError(t, err) ver, err = getBootstrapVersion(seV135) @@ -1554,7 +1560,7 @@ func TestTiDBUpgradeToVer136(t *testing.T) { } func TestTiDBUpgradeToVer140(t *testing.T) { - store, _ := CreateStoreAndBootstrap(t) + store, do := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() @@ -1580,6 +1586,7 @@ func TestTiDBUpgradeToVer140(t *testing.T) { s := CreateSessionAndSetID(t, store) MustExec(t, s, "alter table mysql.tidb_global_task drop column task_key") resetTo139(s) + do.Close() dom, err := BootstrapSession(store) require.NoError(t, err) ver, err := getBootstrapVersion(s) @@ -1600,7 +1607,7 @@ func TestTiDBUpgradeToVer140(t *testing.T) { func TestTiDBNonPrepPlanCacheUpgradeFrom540To700(t *testing.T) { ctx := context.Background() - store, _ := CreateStoreAndBootstrap(t) + store, dom := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() // bootstrap to 5.4 @@ -1629,6 +1636,7 @@ func TestTiDBNonPrepPlanCacheUpgradeFrom540To700(t *testing.T) { require.Equal(t, 0, chk.NumRows()) // Upgrade to 7.0 + dom.Close() domCurVer, err := BootstrapSession(store) require.NoError(t, err) defer domCurVer.Close() @@ -1659,7 +1667,7 @@ func TestTiDBNonPrepPlanCacheUpgradeFrom540To700(t *testing.T) { func TestTiDBStatsLoadPseudoTimeoutUpgradeFrom610To650(t *testing.T) { ctx := context.Background() - store, _ := CreateStoreAndBootstrap(t) + store, dom := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() // upgrade from 6.1 to 6.5+. @@ -1691,6 +1699,7 @@ func TestTiDBStatsLoadPseudoTimeoutUpgradeFrom610To650(t *testing.T) { require.Equal(t, "0", row.GetString(1)) // Upgrade to 6.5. + dom.Close() domCurVer, err := BootstrapSession(store) require.NoError(t, err) defer domCurVer.Close() @@ -1712,7 +1721,7 @@ func TestTiDBStatsLoadPseudoTimeoutUpgradeFrom610To650(t *testing.T) { func TestTiDBTiDBOptTiDBOptimizerEnableNAAJWhenUpgradingToVer138(t *testing.T) { ctx := context.Background() - store, _ := CreateStoreAndBootstrap(t) + store, dom := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() ver137 := version137 @@ -1742,6 +1751,7 @@ func TestTiDBTiDBOptTiDBOptimizerEnableNAAJWhenUpgradingToVer138(t *testing.T) { require.Equal(t, "OFF", row.GetString(1)) // Upgrade to version 138. + dom.Close() domCurVer, err := BootstrapSession(store) require.NoError(t, err) defer domCurVer.Close() @@ -1761,7 +1771,7 @@ func TestTiDBTiDBOptTiDBOptimizerEnableNAAJWhenUpgradingToVer138(t *testing.T) { } func TestTiDBUpgradeToVer143(t *testing.T) { - store, _ := CreateStoreAndBootstrap(t) + store, do := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() @@ -1782,6 +1792,7 @@ func TestTiDBUpgradeToVer143(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(ver142), ver) + do.Close() dom, err := BootstrapSession(store) require.NoError(t, err) ver, err = getBootstrapVersion(seV142) @@ -1792,7 +1803,7 @@ func TestTiDBUpgradeToVer143(t *testing.T) { func TestTiDBLoadBasedReplicaReadThresholdUpgradingToVer141(t *testing.T) { ctx := context.Background() - store, _ := CreateStoreAndBootstrap(t) + store, do := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() // upgrade from 7.0 to 7.1. @@ -1824,6 +1835,7 @@ func TestTiDBLoadBasedReplicaReadThresholdUpgradingToVer141(t *testing.T) { require.Equal(t, "0", row.GetString(1)) // Upgrade to 7.1. + do.Close() domCurVer, err := BootstrapSession(store) require.NoError(t, err) defer domCurVer.Close() @@ -1845,7 +1857,7 @@ func TestTiDBLoadBasedReplicaReadThresholdUpgradingToVer141(t *testing.T) { func TestTiDBPlanCacheInvalidationOnFreshStatsWhenUpgradingToVer144(t *testing.T) { ctx := context.Background() - store, _ := CreateStoreAndBootstrap(t) + store, do := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() // bootstrap as version143 @@ -1864,6 +1876,7 @@ func TestTiDBPlanCacheInvalidationOnFreshStatsWhenUpgradingToVer144(t *testing.T unsetStoreBootstrapped(store.UUID()) // upgrade to ver144 + do.Close() domCurVer, err := BootstrapSession(store) require.NoError(t, err) defer domCurVer.Close() @@ -1891,7 +1904,7 @@ func TestTiDBPlanCacheInvalidationOnFreshStatsWhenUpgradingToVer144(t *testing.T } func TestTiDBUpgradeToVer145(t *testing.T) { - store, _ := CreateStoreAndBootstrap(t) + store, do := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() @@ -1912,6 +1925,7 @@ func TestTiDBUpgradeToVer145(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(ver144), ver) + do.Close() dom, err := BootstrapSession(store) require.NoError(t, err) ver, err = getBootstrapVersion(seV144) @@ -1921,7 +1935,7 @@ func TestTiDBUpgradeToVer145(t *testing.T) { } func TestTiDBUpgradeToVer170(t *testing.T) { - store, _ := CreateStoreAndBootstrap(t) + store, do := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() @@ -1941,6 +1955,7 @@ func TestTiDBUpgradeToVer170(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(ver169), ver) + do.Close() dom, err := BootstrapSession(store) require.NoError(t, err) ver, err = getBootstrapVersion(seV169) @@ -1951,7 +1966,7 @@ func TestTiDBUpgradeToVer170(t *testing.T) { func TestTiDBBindingInListToVer175(t *testing.T) { ctx := context.Background() - store, _ := CreateStoreAndBootstrap(t) + store, dom := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() // bootstrap as version174 @@ -2003,6 +2018,7 @@ func TestTiDBBindingInListToVer175(t *testing.T) { "SELECT /*+ use_index(`t` `c`)*/ * FROM `test`.`t` WHERE `a` IN (1,2,3):select * from `test` . `t` where `a` in ( ... )"}, bindings) // upgrade to ver175 + dom.Close() domCurVer, err := BootstrapSession(store) require.NoError(t, err) defer domCurVer.Close() @@ -2033,7 +2049,7 @@ func TestTiDBBindingInListToVer175(t *testing.T) { } func TestTiDBUpgradeToVer176(t *testing.T) { - store, _ := CreateStoreAndBootstrap(t) + store, do := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() @@ -2053,6 +2069,7 @@ func TestTiDBUpgradeToVer176(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(ver175), ver) + do.Close() dom, err := BootstrapSession(store) require.NoError(t, err) ver, err = getBootstrapVersion(seV175) @@ -2063,7 +2080,7 @@ func TestTiDBUpgradeToVer176(t *testing.T) { } func TestTiDBUpgradeToVer177(t *testing.T) { - store, _ := CreateStoreAndBootstrap(t) + store, do := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() @@ -2083,6 +2100,7 @@ func TestTiDBUpgradeToVer177(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(ver176), ver) + do.Close() dom, err := BootstrapSession(store) require.NoError(t, err) ver, err = getBootstrapVersion(seV176) @@ -2117,7 +2135,7 @@ func TestWriteDDLTableVersionToMySQLTiDB(t *testing.T) { func TestWriteDDLTableVersionToMySQLTiDBWhenUpgradingTo178(t *testing.T) { ctx := context.Background() - store, _ := CreateStoreAndBootstrap(t) + store, dom := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() txn, err := store.Begin() @@ -2142,6 +2160,7 @@ func TestWriteDDLTableVersionToMySQLTiDBWhenUpgradingTo178(t *testing.T) { require.Equal(t, int64(ver177), ver) // upgrade to current version + dom.Close() domCurVer, err := BootstrapSession(store) require.NoError(t, err) defer domCurVer.Close() @@ -2162,7 +2181,7 @@ func TestWriteDDLTableVersionToMySQLTiDBWhenUpgradingTo178(t *testing.T) { func TestTiDBUpgradeToVer179(t *testing.T) { ctx := context.Background() - store, _ := CreateStoreAndBootstrap(t) + store, do := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() @@ -2182,6 +2201,7 @@ func TestTiDBUpgradeToVer179(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(ver178), ver) + do.Close() dom, err := BootstrapSession(store) require.NoError(t, err) ver, err = getBootstrapVersion(seV178) @@ -2200,7 +2220,7 @@ func TestTiDBUpgradeToVer179(t *testing.T) { } func testTiDBUpgradeWithDistTask(t *testing.T, injectQuery string, fatal bool) { - store, _ := CreateStoreAndBootstrap(t) + store, do := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() @@ -2229,8 +2249,7 @@ func testTiDBUpgradeWithDistTask(t *testing.T, injectQuery string, fatal bool) { rs() }() - var dom *domain.Domain - + do.Close() fatal2panic := false fc := func() { defer func() { @@ -2238,20 +2257,19 @@ func testTiDBUpgradeWithDistTask(t *testing.T, injectQuery string, fatal bool) { fatal2panic = true } }() - dom, _ = BootstrapSession(store) + _, _ = BootstrapSession(store) } fc() - - if fatal { - dom = domain.GetDomain(seV178) - } + var dom *domain.Domain + dom, err = domap.Get(store) + require.NoError(t, err) dom.Close() require.Equal(t, fatal, fatal2panic) } func TestTiDBUpgradeToVer209(t *testing.T) { ctx := context.Background() - store, _ := CreateStoreAndBootstrap(t) + store, dom := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() // bootstrap as version198, version 199~208 is reserved for v8.1.x bugfix patch. @@ -2270,6 +2288,7 @@ func TestTiDBUpgradeToVer209(t *testing.T) { unsetStoreBootstrapped(store.UUID()) // upgrade to ver209 + dom.Close() domCurVer, err := BootstrapSession(store) require.NoError(t, err) defer domCurVer.Close() @@ -2324,7 +2343,7 @@ func TestTiDBUpgradeWithDistTaskRunning(t *testing.T) { func TestTiDBUpgradeToVer211(t *testing.T) { ctx := context.Background() - store, _ := CreateStoreAndBootstrap(t) + store, do := CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() @@ -2345,12 +2364,14 @@ func TestTiDBUpgradeToVer211(t *testing.T) { require.Equal(t, int64(ver210), ver) MustExec(t, seV210, "alter table mysql.tidb_background_subtask_history drop column summary;") + do.Close() dom, err := BootstrapSession(store) require.NoError(t, err) ver, err = getBootstrapVersion(seV210) require.NoError(t, err) require.Less(t, int64(ver210), ver) + domain.BindDomain(seV210, dom) r := MustExecToRecodeSet(t, seV210, "select count(summary) from mysql.tidb_background_subtask_history;") req := r.NewChunk(nil) err = r.Next(ctx, req) @@ -2379,7 +2400,7 @@ func TestTiDBHistoryTableConsistent(t *testing.T) { row := req.GetRow(0) require.Equal(t, int64(1), row.GetInt64(0)) - query = `select (select group_concat(column_name) from information_schema.columns where table_name='tidb_global_task' order by ordinal_position) + query = `select (select group_concat(column_name) from information_schema.columns where table_name='tidb_global_task' order by ordinal_position) = (select group_concat(column_name) from information_schema.columns where table_name='tidb_global_task_history' order by ordinal_position);` r = MustExecToRecodeSet(t, se, query) req = r.NewChunk(nil) diff --git a/pkg/session/bootstraptest/bootstrap_upgrade_test.go b/pkg/session/bootstraptest/bootstrap_upgrade_test.go index ef5565fb97c56..3713d0cf6a6e1 100644 --- a/pkg/session/bootstraptest/bootstrap_upgrade_test.go +++ b/pkg/session/bootstraptest/bootstrap_upgrade_test.go @@ -257,8 +257,8 @@ func TestUpgradeVersionMockLatest(t *testing.T) { ver, err := session.GetBootstrapVersion(seV) require.NoError(t, err) require.Equal(t, session.CurrentBootstrapVersion-1, ver) - dom.Close() startUpgrade(store) + dom.Close() domLatestV, err := session.BootstrapSession(store) require.NoError(t, err) defer domLatestV.Close() @@ -322,7 +322,6 @@ func TestUpgradeVersionWithUpgradeHTTPOp(t *testing.T) { ver, err := session.GetBootstrapVersion(seV) require.NoError(t, err) require.Equal(t, session.SupportUpgradeHTTPOpVer, ver) - dom.Close() // Start the upgrade test. // Current cluster state is normal. @@ -331,6 +330,8 @@ func TestUpgradeVersionWithUpgradeHTTPOp(t *testing.T) { require.Equal(t, false, isUpgrading) upgradeHandler := handler.NewClusterUpgradeHandler(store) upgradeHandler.StartUpgrade() + + dom.Close() domLatestV, err := session.BootstrapSession(store) require.NoError(t, err) defer domLatestV.Close() @@ -434,9 +435,9 @@ func TestUpgradeVersionForPausedJob(t *testing.T) { }() <-ch - dom.Close() // Make sure upgrade is successful. startUpgrade(store) + dom.Close() domLatestV, err := session.BootstrapSession(store) require.NoError(t, err) defer domLatestV.Close() @@ -525,9 +526,9 @@ func TestUpgradeVersionForSystemPausedJob(t *testing.T) { }() <-ch - dom.Close() // Make sure upgrade is successful. startUpgrade(store) + dom.Close() domLatestV, err := session.BootstrapSession(store) require.NoError(t, err) defer domLatestV.Close() @@ -587,9 +588,9 @@ func TestUpgradeVersionForResumeJob(t *testing.T) { }() <-ch - dom.Close() // Make sure upgrade is successful. startUpgrade(store) + dom.Close() domLatestV, err := session.BootstrapSession(store) require.NoError(t, err) defer domLatestV.Close() @@ -765,8 +766,8 @@ func TestUpgradeWithPauseDDL(t *testing.T) { ver, err := session.GetBootstrapVersion(seV) require.NoError(t, err) require.Equal(t, session.CurrentBootstrapVersion-1, ver) - dom.Close() startUpgrade(store) + dom.Close() domLatestV, err := session.BootstrapSession(store) require.NoError(t, err) defer domLatestV.Close() diff --git a/pkg/session/session.go b/pkg/session/session.go index 8eeebd640392b..872835087e920 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -3404,6 +3404,18 @@ func BootstrapSession4DistExecution(store kv.Storage) (*domain.Domain, error) { return bootstrapSessionImpl(store, createSessions4DistExecution) } +// bootstrapSessionImpl bootstraps session and domain. +// the process works as follows: +// - if we haven't bootstrapped to the target version +// - create/init/start domain +// - bootstrap or upgrade, some variables will be initialized and stored to system +// table in the process, such as system time-zone +// - close domain +// +// - create/init another domain +// - initialization global variables from system table that's required to use sessionCtx, +// such as system time zone +// - start domain and other routines. func bootstrapSessionImpl(store kv.Storage, createSessionsImpl func(store kv.Storage, cnt int) ([]*session, error)) (*domain.Domain, error) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap) cfg := config.GetGlobalConfig() @@ -3487,10 +3499,16 @@ func bootstrapSessionImpl(store kv.Storage, createSessionsImpl func(store kv.Sto return nil, err } collate.SetNewCollationEnabledForTest(newCollationEnabled) - // To deal with the location partition failure caused by inconsistent NewCollationEnabled values(see issue #32416). - rebuildAllPartitionValueMapAndSorted(ses[0]) + // only start the domain after we have initialized some global variables. dom := domain.GetDomain(ses[0]) + err = dom.Start() + if err != nil { + return nil, err + } + + // To deal with the location partition failure caused by inconsistent NewCollationEnabled values(see issue #32416). + rebuildAllPartitionValueMapAndSorted(ses[0]) // We should make the load bind-info loop before other loops which has internal SQL. // Because the internal SQL may access the global bind-info handler. As the result, the data race occurs here as the @@ -3667,6 +3685,13 @@ func runInBootstrapSession(store kv.Storage, bootstrap func(types.Session)) { // Bootstrap fail will cause program exit. logutil.BgLogger().Fatal("createSession error", zap.Error(err)) } + dom := domain.GetDomain(s) + err = dom.Start() + if err != nil { + // Bootstrap fail will cause program exit. + logutil.BgLogger().Fatal("start domain error", zap.Error(err)) + } + // For the bootstrap SQLs, the following variables should be compatible with old TiDB versions. s.sessionVars.EnableClusteredIndex = variable.ClusteredIndexDefModeIntOnly @@ -3675,7 +3700,6 @@ func runInBootstrapSession(store kv.Storage, bootstrap func(types.Session)) { finishBootstrap(store) s.ClearValue(sessionctx.Initing) - dom := domain.GetDomain(s) dom.Close() if intest.InTest { infosync.MockGlobalServerInfoManagerEntry.Close() diff --git a/pkg/session/tidb.go b/pkg/session/tidb.go index 5e5fadbdcf3a8..a59529dc4669a 100644 --- a/pkg/session/tidb.go +++ b/pkg/session/tidb.go @@ -56,6 +56,9 @@ type domainMap struct { domains map[string]*domain.Domain } +// Get or create the domain for store. +// TODO decouple domain create from it, it's more clear to create domain explicitly +// before any usage of it. func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { dm.mu.Lock() defer dm.mu.Unlock()