diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index cf1a876755c99..a86ef312d1804 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -111,7 +111,7 @@ func (g Glue) startDomainAsNeeded(store kv.Storage) error { if err != nil { return err } - return dom.Start() + return dom.Start(ddl.Normal) } func (g Glue) createTypesSession(store kv.Storage) (sessiontypes.Session, error) { diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index e82338bd2afe1..ad9d87bed4d7a 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -93,6 +93,20 @@ var ( detectJobVerInterval = 10 * time.Second ) +// StartMode is an enum type for the start mode of the DDL. +type StartMode string + +const ( + // Normal mode, cluster is in normal state. + Normal StartMode = "normal" + // Bootstrap mode, cluster is during bootstrap. + Bootstrap StartMode = "bootstrap" + // Upgrade mode, cluster is during upgrade, we will force current node to be + // the DDL owner, to make sure all upgrade related DDLs are run on new version + // TiDB instance. + Upgrade StartMode = "upgrade" +) + // OnExist specifies what to do when a new object has a name collision. type OnExist uint8 @@ -161,7 +175,7 @@ var ( type DDL interface { // Start campaigns the owner and starts workers. // ctxPool is used for the worker's delRangeManager and creates sessions. - Start(ctxPool *pools.ResourcePool) error + Start(startMode StartMode, ctxPool *pools.ResourcePool) error // Stats returns the DDL statistics. Stats(vars *variable.SessionVars) (map[string]any, error) // GetScope gets the status variables scope. @@ -748,10 +762,21 @@ func (d *ddl) newDeleteRangeManager(mock bool) delRangeManager { } // Start implements DDL.Start interface. -func (d *ddl) Start(ctxPool *pools.ResourcePool) error { +func (d *ddl) Start(startMode StartMode, ctxPool *pools.ResourcePool) error { d.detectAndUpdateJobVersion() + campaignOwner := config.GetGlobalConfig().Instance.TiDBEnableDDL.Load() + if startMode == Upgrade { + if !campaignOwner { + return errors.New("DDL must be enabled when upgrading") + } + + logutil.DDLLogger().Info("DDL is in upgrade mode, force to be owner") + if err := d.ownerManager.ForceToBeOwner(d.ctx); err != nil { + return errors.Trace(err) + } + } logutil.DDLLogger().Info("start DDL", zap.String("ID", d.uuid), - zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()), + zap.Bool("runWorker", campaignOwner), zap.Stringer("jobVersion", model.GetJobVerInUse())) d.sessPool = sess.NewSessionPool(ctxPool) @@ -786,7 +811,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { // If tidb_enable_ddl is true, we need campaign owner and do DDL jobs. Besides, we also can do backfill jobs. // Otherwise, we needn't do that. - if config.GetGlobalConfig().Instance.TiDBEnableDDL.Load() { + if campaignOwner { if err := d.EnableDDL(); err != nil { return err } diff --git a/pkg/ddl/restart_test.go b/pkg/ddl/restart_test.go index 3009111af511f..30935643db1bb 100644 --- a/pkg/ddl/restart_test.go +++ b/pkg/ddl/restart_test.go @@ -50,7 +50,7 @@ func restartWorkers(t *testing.T, store kv.Storage, d *domain.Domain) { ddl.WithSchemaLoader(d), ) d.SetDDL(newDDL, newDDLExecutor) - err = newDDL.Start(pools.NewResourcePool(func() (pools.Resource, error) { + err = newDDL.Start(ddl.Normal, pools.NewResourcePool(func() (pools.Resource, error) { session := testkit.NewTestKit(t, store).Session() session.GetSessionVars().CommonGlobalLoaded = true return session, nil diff --git a/pkg/ddl/schema_test.go b/pkg/ddl/schema_test.go index 2ff375b76a7c9..7132fabe8c956 100644 --- a/pkg/ddl/schema_test.go +++ b/pkg/ddl/schema_test.go @@ -307,7 +307,7 @@ func TestSchemaWaitJob(t *testing.T) { ddl.WithSchemaLoader(domain), ) det2 := de2.(ddl.ExecutorForTest) - err := d2.Start(pools.NewResourcePool(func() (pools.Resource, error) { + err := d2.Start(ddl.Normal, pools.NewResourcePool(func() (pools.Resource, error) { session := testkit.NewTestKit(t, store).Session() session.GetSessionVars().CommonGlobalLoaded = true return session, nil diff --git a/pkg/ddl/schematracker/checker.go b/pkg/ddl/schematracker/checker.go index 1c666a026d352..7068d4bce229b 100644 --- a/pkg/ddl/schematracker/checker.go +++ b/pkg/ddl/schematracker/checker.go @@ -502,8 +502,8 @@ func (*Checker) CreatePlacementPolicyWithInfo(_ sessionctx.Context, _ *model.Pol } // Start implements the DDL interface. -func (d *Checker) Start(ctxPool *pools.ResourcePool) error { - return d.realDDL.Start(ctxPool) +func (d *Checker) Start(startMode ddl.StartMode, ctxPool *pools.ResourcePool) error { + return d.realDDL.Start(startMode, ctxPool) } // Stats implements the DDL interface. diff --git a/pkg/domain/BUILD.bazel b/pkg/domain/BUILD.bazel index 296bacaf25c8d..fea17d5c870e1 100644 --- a/pkg/domain/BUILD.bazel +++ b/pkg/domain/BUILD.bazel @@ -136,7 +136,7 @@ go_test( ], embed = [":domain"], flaky = True, - shard_count = 30, + shard_count = 31, deps = [ "//pkg/config", "//pkg/ddl", diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 91a7b75044ff4..c845b07497773 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -1311,6 +1311,37 @@ func NewDomain(store kv.Storage, schemaLease time.Duration, statsLease time.Dura const serverIDForStandalone = 1 // serverID for standalone deployment. +// NewEtcdCli creates a new clientv3.Client from store if the store support it. +// the returned client might be nil. +// TODO currently uni-store/mock-tikv/tikv all implements EtcdBackend while they don't support actually. +// refactor this part. +func NewEtcdCli(store kv.Storage) (*clientv3.Client, error) { + etcdStore, addrs, err := getEtcdAddrs(store) + if err != nil { + return nil, err + } + if len(addrs) == 0 { + return nil, nil + } + cli, err := newEtcdCli(addrs, etcdStore) + if err != nil { + return nil, err + } + return cli, nil +} + +func getEtcdAddrs(store kv.Storage) (kv.EtcdBackend, []string, error) { + etcdStore, ok := store.(kv.EtcdBackend) + if !ok { + return nil, nil, nil + } + addrs, err := etcdStore.EtcdAddrs() + if err != nil { + return nil, nil, err + } + return etcdStore, addrs, nil +} + func newEtcdCli(addrs []string, ebd kv.EtcdBackend) (*clientv3.Client, error) { cfg := config.GetGlobalConfig() etcdLogCfg := zap.NewProductionConfig() @@ -1344,30 +1375,26 @@ func (do *Domain) Init( ) error { do.sysExecutorFactory = sysExecutorFactory perfschema.Init() - if ebd, ok := do.store.(kv.EtcdBackend); ok { - var addrs []string - var err error - if addrs, err = ebd.EtcdAddrs(); err != nil { - return err + etcdStore, addrs, err := getEtcdAddrs(do.store) + if err != nil { + return errors.Trace(err) + } + if len(addrs) > 0 { + cli, err2 := newEtcdCli(addrs, etcdStore) + if err2 != nil { + return errors.Trace(err2) } - if addrs != nil { - cli, err := newEtcdCli(addrs, ebd) - if err != nil { - return errors.Trace(err) - } + etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec())) - etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec())) + do.etcdClient = cli - do.etcdClient = cli + do.autoidClient = autoid.NewClientDiscover(cli) - do.autoidClient = autoid.NewClientDiscover(cli) - - unprefixedEtcdCli, err := newEtcdCli(addrs, ebd) - if err != nil { - return errors.Trace(err) - } - do.unprefixedEtcdCli = unprefixedEtcdCli + unprefixedEtcdCli, err2 := newEtcdCli(addrs, etcdStore) + if err2 != nil { + return errors.Trace(err2) } + do.unprefixedEtcdCli = unprefixedEtcdCli } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -1418,7 +1445,6 @@ func (do *Domain) Init( // step 1: prepare the info/schema syncer which domain reload needed. pdCli, pdHTTPCli := do.GetPDClient(), do.GetPDHTTPClient() skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard - var err error do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, pdCli, pdHTTPCli, do.Store().GetCodec(), skipRegisterToDashboard) @@ -1482,7 +1508,7 @@ func (do *Domain) Init( // Start starts the domain. After start, DDLs can be executed using session, see // Init also. -func (do *Domain) Start() error { +func (do *Domain) Start(startMode ddl.StartMode) error { gCfg := config.GetGlobalConfig() if gCfg.EnableGlobalKill && do.etcdClient != nil { do.wg.Add(1) @@ -1501,7 +1527,7 @@ func (do *Domain) Start() error { sysCtxPool := pools.NewResourcePool(sysFac, 512, 512, resourceIdleTimeout) // start the ddl after the domain reload, avoiding some internal sql running before infoSchema construction. - err := do.ddl.Start(sysCtxPool) + err := do.ddl.Start(startMode, sysCtxPool) if err != nil { return err } diff --git a/pkg/domain/domain_test.go b/pkg/domain/domain_test.go index a6ac1da87955d..f575d8702b305 100644 --- a/pkg/domain/domain_test.go +++ b/pkg/domain/domain_test.go @@ -94,7 +94,7 @@ func TestInfo(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/MockReplaceDDL", `return(true)`)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/NoDDLDispatchLoop", `return(true)`)) require.NoError(t, dom.Init(sysMockFactory, nil)) - require.NoError(t, dom.Start()) + require.NoError(t, dom.Start(ddl.Bootstrap)) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/NoDDLDispatchLoop")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/MockReplaceDDL")) @@ -486,3 +486,19 @@ func TestDeferFn(t *testing.T) { require.True(t, d) require.Len(t, df.data, 1) } + +func TestNewEtcdCliGetEtcdAddrs(t *testing.T) { + etcdStore, addrs, err := getEtcdAddrs(nil) + require.NoError(t, err) + require.Empty(t, addrs) + require.Nil(t, etcdStore) + + etcdStore, addrs, err = getEtcdAddrs(&mockEtcdBackend{pdAddrs: []string{"localhost:2379"}}) + require.NoError(t, err) + require.Equal(t, []string{"localhost:2379"}, addrs) + require.NotNil(t, etcdStore) + + cli, err := NewEtcdCli(nil) + require.NoError(t, err) + require.Nil(t, cli) +} diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index e4d467b16b1db..e6c4ebd4593f5 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -69,10 +69,25 @@ type Manager interface { CampaignCancel() // SetListener sets the listener, set before CampaignOwner. SetListener(listener Listener) + // ForceToBeOwner restart the owner election and trying to be the new owner by + // end campaigns of all candidates and start a new campaign in a single transaction. + // + // This method is only used during upgrade and try to make node of newer version + // to be the DDL owner, to mitigate the issue https://github.com/pingcap/tidb/issues/54689, + // current instance shouldn't call CampaignOwner before calling this method. + // don't use it in other cases. + // + // Note: only one instance can call this method at a time, so you have to use + // a distributed lock when there are multiple instances of new version TiDB trying + // to be the owner. See runInBootstrapSession for where we lock it in DDL. + ForceToBeOwner(ctx context.Context) error } const ( keyOpDefaultTimeout = 5 * time.Second + + // WaitTimeOnForceOwner is the time to wait before or after force to be owner. + WaitTimeOnForceOwner = 5 * time.Second ) // OpType is the owner key value operation type. @@ -120,7 +135,8 @@ type ownerManager struct { wg sync.WaitGroup campaignCancel context.CancelFunc - listener Listener + listener Listener + forceOwnerSession *concurrency.Session } // NewOwnerManager creates a new Manager. @@ -165,6 +181,96 @@ func (m *ownerManager) SetListener(listener Listener) { m.listener = listener } +func (m *ownerManager) ForceToBeOwner(context.Context) error { + logPrefix := fmt.Sprintf("[%s] %s", m.prompt, m.key) + logutil.BgLogger().Info("force to be owner", zap.String("ownerInfo", logPrefix)) + session, err := util2.NewSession(m.ctx, logPrefix, m.etcdCli, util2.NewSessionDefaultRetryCnt, ManagerSessionTTL) + if err != nil { + return errors.Trace(err) + } + m.forceOwnerSession = session + m.sessionLease.Store(int64(m.forceOwnerSession.Lease())) + + // due to issue https://github.com/pingcap/tidb/issues/54689, if the cluster + // version before upgrade don't have fix, when retire owners runs on older version + // and trying to be the new owner, it's possible that multiple owner exist at + // the same time, it cannot be avoided completely, but we can use below 2 strategies + // to mitigate this issue: + // 1. when trying to be owner, we delete all existing owner related keys and + // put new key in a single txn, if we delete the key one by one, other node + // might become the owner, it will have more chances to trigger previous issue. + // 2. sleep for a while before trying to be owner to make sure there is an owner in + // the cluster, and it has started watching. in the case of upgrade using + // tiup, tiup might restart current owner node to do rolling upgrade. + // before the restarted node force owner, another node might try to be + // the new owner too, it's still possible to trigger the issue. so we + // sleep a while to wait the cluster have a new owner and start watching. + for i := 0; i < 3; i++ { + // we need to sleep in every retry, as other TiDB nodes will start campaign + // immediately after we delete their key. + time.Sleep(WaitTimeOnForceOwner) + if err = m.tryToBeOwnerOnce(); err != nil { + logutil.Logger(m.logCtx).Warn("failed to retire owner on older version", zap.Error(err)) + continue + } + break + } + return nil +} + +func (m *ownerManager) tryToBeOwnerOnce() error { + lease := m.forceOwnerSession.Lease() + keyPrefix := m.key + "/" + + getResp, err := m.etcdCli.Get(m.ctx, keyPrefix, clientv3.WithPrefix()) + if err != nil { + return err + } + + // modifications to the same key multiple times within a single transaction are + // forbidden in etcd, so we cannot use delete by prefix and put in a single txn. + // it will report "duplicate key given in txn request" error. + // It's possible that other nodes put campaign keys between we get the keys and + // the txn to put new key, we relay on the sleep before calling this method to + // make sure all TiDBs have already put the key, and the distributed lock inside + // bootstrap to make sure no concurrent ForceToBeOwner is called. + txnOps := make([]clientv3.Op, 0, len(getResp.Kvs)+1) + // below key structure is copied from Election.Campaign. + campaignKey := fmt.Sprintf("%s%x", keyPrefix, lease) + for _, kv := range getResp.Kvs { + key := string(kv.Key) + if key == campaignKey { + // if below campaign failed, it will resign automatically, but if resign + // also failed, the old key might already exist + continue + } + txnOps = append(txnOps, clientv3.OpDelete(key)) + } + txnOps = append(txnOps, clientv3.OpPut(campaignKey, m.id, clientv3.WithLease(lease))) + _, err = m.etcdCli.Txn(m.ctx).Then(txnOps...).Commit() + if err != nil { + return errors.Trace(err) + } + // Campaign will wait until there is no key with smaller create-revision, either + // current instance become owner or all the keys are deleted, in case other nodes + // put keys in between previous get and txn, and makes current node never become + // the owner, so we add a timeout to avoid blocking. + ctx, cancel := context.WithTimeout(m.ctx, keyOpDefaultTimeout) + defer cancel() + elec := concurrency.NewElection(m.forceOwnerSession, m.key) + if err = elec.Campaign(ctx, m.id); err != nil { + return errors.Trace(err) + } + + // Campaign assumes that it's the only client managing the lifecycle of the campaign + // key, it only checks whether there are any keys with smaller create-revision, + // so it will also return when all the campaign keys are deleted by other TiDB + // instances when the distributed lock has failed to keep alive and another TiDB + // get the lock. It's a quite rare case, and the TiDB must be of newer version + // which has the fix of the issue, so it's ok to return now. + return nil +} + // ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing. var ManagerSessionTTL = 60 @@ -190,15 +296,19 @@ func (m *ownerManager) CampaignOwner(withTTL ...int) error { } logPrefix := fmt.Sprintf("[%s] %s", m.prompt, m.key) logutil.BgLogger().Info("start campaign owner", zap.String("ownerInfo", logPrefix)) - session, err := util2.NewSession(m.ctx, logPrefix, m.etcdCli, util2.NewSessionDefaultRetryCnt, ttl) - if err != nil { - return errors.Trace(err) + campaignSession := m.forceOwnerSession + if campaignSession == nil { + session, err := util2.NewSession(m.ctx, logPrefix, m.etcdCli, util2.NewSessionDefaultRetryCnt, ttl) + if err != nil { + return errors.Trace(err) + } + m.sessionLease.Store(int64(session.Lease())) + campaignSession = session } - m.sessionLease.Store(int64(session.Lease())) m.wg.Add(1) var campaignContext context.Context campaignContext, m.campaignCancel = context.WithCancel(m.ctx) - go m.campaignLoop(campaignContext, session) + go m.campaignLoop(campaignContext, campaignSession) return nil } @@ -500,16 +610,6 @@ func init() { } } -// DeleteLeader deletes the leader key. -func DeleteLeader(ctx context.Context, cli *clientv3.Client, key string) error { - ownerKey, _, _, _, _, err := getOwnerInfo(ctx, ctx, cli, key) - if err != nil { - return errors.Trace(err) - } - _, err = cli.Delete(ctx, ownerKey) - return err -} - // AcquireDistributedLock creates a mutex with ETCD client, and returns a mutex release function. func AcquireDistributedLock( ctx context.Context, @@ -540,13 +640,13 @@ func AcquireDistributedLock( } return nil, err } - logutil.Logger(ctx).Info("acquire distributed flush lock success", zap.String("key", key)) + logutil.Logger(ctx).Info("acquire distributed lock success", zap.String("key", key)) return func() { err = mu.Unlock(ctx) if err != nil { - logutil.Logger(ctx).Warn("release distributed flush lock error", zap.Error(err), zap.String("key", key)) + logutil.Logger(ctx).Warn("release distributed lock error", zap.Error(err), zap.String("key", key)) } else { - logutil.Logger(ctx).Info("release distributed flush lock success", zap.String("key", key)) + logutil.Logger(ctx).Info("release distributed lock success", zap.String("key", key)) } err = se.Close() if err != nil { diff --git a/pkg/owner/mock.go b/pkg/owner/mock.go index 75a934307b41e..f653b51e0e119 100644 --- a/pkg/owner/mock.go +++ b/pkg/owner/mock.go @@ -181,6 +181,10 @@ func (m *mockManager) SetListener(listener Listener) { m.listener = listener } +func (*mockManager) ForceToBeOwner(context.Context) error { + return nil +} + // CampaignCancel implements Manager.CampaignCancel interface func (m *mockManager) CampaignCancel() { m.campaignDone <- struct{}{} diff --git a/pkg/session/BUILD.bazel b/pkg/session/BUILD.bazel index 34b00407b45d6..51c5b5e23c739 100644 --- a/pkg/session/BUILD.bazel +++ b/pkg/session/BUILD.bazel @@ -144,6 +144,7 @@ go_test( "bench_test.go", "bootstrap_test.go", "main_test.go", + "session_test.go", "tidb_test.go", ], embed = [":session"], diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index 25e1fcc1c175a..0d5c5d295abcc 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/bindinfo" "github.com/pingcap/tidb/pkg/config" - "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/domain/infosync" @@ -1427,47 +1426,31 @@ var ( SupportUpgradeHTTPOpVer int64 = version174 ) -func acquireLock(s sessiontypes.Session) (func(), bool) { - dom := domain.GetDomain(s) - if dom == nil { - logutil.BgLogger().Warn("domain is nil") - return nil, false +func acquireLock(store kv.Storage) (func(), error) { + etcdCli, err := domain.NewEtcdCli(store) + if err != nil { + return nil, errors.Trace(err) } - cli := dom.GetEtcdClient() - if cli == nil { - logutil.BgLogger().Warn("etcd client is nil, force to acquire ddl owner lock") + if etcdCli == nil { // Special handling for test. + logutil.BgLogger().Warn("skip acquire ddl owner lock for uni-store") return func() { // do nothing - }, true + }, nil } - releaseFn, err := owner.AcquireDistributedLock(context.Background(), cli, bootstrapOwnerKey, 10) + releaseFn, err := owner.AcquireDistributedLock(context.Background(), etcdCli, bootstrapOwnerKey, 10) if err != nil { - return nil, false - } - return releaseFn, true -} - -func forceToLeader(ctx context.Context, s sessiontypes.Session) error { - dom := domain.GetDomain(s) - for !dom.DDL().OwnerManager().IsOwner() { - ownerID, err := dom.DDL().OwnerManager().GetOwnerID(ctx) - if err != nil && (errors.ErrorEqual(err, concurrency.ErrElectionNoLeader) || strings.Contains(err.Error(), "no owner")) { - logutil.BgLogger().Info("ddl owner not found", zap.Error(err)) - time.Sleep(50 * time.Millisecond) - continue - } else if err != nil { - logutil.BgLogger().Error("unexpected error", zap.Error(err)) - return err + if err2 := etcdCli.Close(); err2 != nil { + logutil.BgLogger().Error("failed to close etcd client", zap.Error(err2)) } - err = owner.DeleteLeader(ctx, dom.EtcdClient(), ddl.DDLOwnerKey) - if err != nil { - logutil.BgLogger().Error("unexpected error", zap.Error(err), zap.String("ownerID", ownerID)) - return err - } - time.Sleep(50 * time.Millisecond) + return nil, errors.Trace(err) } - return nil + return func() { + releaseFn() + if err2 := etcdCli.Close(); err2 != nil { + logutil.BgLogger().Error("failed to close etcd client", zap.Error(err2)) + } + }, nil } func checkDistTask(s sessiontypes.Session, ver int64) { @@ -1526,23 +1509,12 @@ func upgrade(s sessiontypes.Session) { } var ver int64 - releaseFn, ok := acquireLock(s) - if !ok { - logutil.BgLogger().Fatal("[upgrade] get ddl owner distributed lock failed", zap.Error(err)) - } ver, err = getBootstrapVersion(s) terror.MustNil(err) if ver >= currentBootstrapVersion { // It is already bootstrapped/upgraded by a higher version TiDB server. - releaseFn() return } - defer releaseFn() - - err = forceToLeader(context.Background(), s) - if err != nil { - logutil.BgLogger().Fatal("[upgrade] force to owner failed", zap.Error(err)) - } checkDistTask(s, ver) printClusterState(s, ver) diff --git a/pkg/session/bootstrap_test.go b/pkg/session/bootstrap_test.go index 05b1b2363046b..1b6d51dc441bf 100644 --- a/pkg/session/bootstrap_test.go +++ b/pkg/session/bootstrap_test.go @@ -170,7 +170,7 @@ func TestBootstrapWithError(t *testing.T) { require.NoError(t, err) dom, err := domap.Get(store) require.NoError(t, err) - require.NoError(t, dom.Start()) + require.NoError(t, dom.Start(ddl.Bootstrap)) domain.BindDomain(se, dom) b, err := checkBootstrapped(se) require.False(t, b) @@ -739,7 +739,7 @@ func TestIndexMergeInNewCluster(t *testing.T) { store, err := mockstore.NewMockStore(mockstore.WithStoreType(mockstore.EmbedUnistore)) require.NoError(t, err) // Indicates we are in a new cluster. - require.Equal(t, int64(notBootstrapped), getStoreBootstrapVersion(store)) + require.Equal(t, int64(notBootstrapped), getStoreBootstrapVersionWithCache(store)) dom, err := BootstrapSession(store) require.NoError(t, err) defer func() { require.NoError(t, store.Close()) }() @@ -1059,7 +1059,7 @@ func TestTiDBOptAdvancedJoinHintInNewCluster(t *testing.T) { store, err := mockstore.NewMockStore(mockstore.WithStoreType(mockstore.EmbedUnistore)) require.NoError(t, err) // Indicates we are in a new cluster. - require.Equal(t, int64(notBootstrapped), getStoreBootstrapVersion(store)) + require.Equal(t, int64(notBootstrapped), getStoreBootstrapVersionWithCache(store)) dom, err := BootstrapSession(store) require.NoError(t, err) defer func() { require.NoError(t, store.Close()) }() @@ -1085,7 +1085,7 @@ func TestTiDBCostModelInNewCluster(t *testing.T) { store, err := mockstore.NewMockStore(mockstore.WithStoreType(mockstore.EmbedUnistore)) require.NoError(t, err) // Indicates we are in a new cluster. - require.Equal(t, int64(notBootstrapped), getStoreBootstrapVersion(store)) + require.Equal(t, int64(notBootstrapped), getStoreBootstrapVersionWithCache(store)) dom, err := BootstrapSession(store) require.NoError(t, err) defer func() { require.NoError(t, store.Close()) }() diff --git a/pkg/session/session.go b/pkg/session/session.go index 0af806c52a58c..126e11aad7a40 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -3447,11 +3447,9 @@ func bootstrapSessionImpl(ctx context.Context, store kv.Storage, createSessionsI if err != nil { return nil, err } - ver := getStoreBootstrapVersion(store) - if ver == notBootstrapped { - runInBootstrapSession(store, bootstrap) - } else if ver < currentBootstrapVersion { - runInBootstrapSession(store, upgrade) + ver := getStoreBootstrapVersionWithCache(store) + if ver < currentBootstrapVersion { + runInBootstrapSession(store, ver) } else { err = InitMDLVariable(store) if err != nil { @@ -3504,7 +3502,7 @@ func bootstrapSessionImpl(ctx context.Context, store kv.Storage, createSessionsI // only start the domain after we have initialized some global variables. dom := domain.GetDomain(ses[0]) - err = dom.Start() + err = dom.Start(ddl.Normal) if err != nil { return nil, err } @@ -3667,28 +3665,67 @@ func GetDomain(store kv.Storage) (*domain.Domain, error) { return domap.Get(store) } +func getStartMode(ver int64) ddl.StartMode { + if ver == notBootstrapped { + return ddl.Bootstrap + } else if ver < currentBootstrapVersion { + return ddl.Upgrade + } + return ddl.Normal +} + // runInBootstrapSession create a special session for bootstrap to run. // If no bootstrap and storage is remote, we must use a little lease time to // bootstrap quickly, after bootstrapped, we will reset the lease time. // TODO: Using a bootstrap tool for doing this may be better later. -func runInBootstrapSession(store kv.Storage, bootstrap func(types.Session)) { +func runInBootstrapSession(store kv.Storage, ver int64) { + startMode := getStartMode(ver) + + if startMode == ddl.Upgrade { + // TODO at this time domain must not be created, else it will register server + // info, and cause deadlock, we need to make sure this in a clear way + logutil.BgLogger().Info("[upgrade] get owner lock to upgrade") + releaseFn, err := acquireLock(store) + if err != nil { + logutil.BgLogger().Fatal("[upgrade] get owner lock failed", zap.Error(err)) + } + defer releaseFn() + currVer := mustGetStoreBootstrapVersion(store) + if currVer >= currentBootstrapVersion { + // It is already bootstrapped/upgraded by another TiDB instance, but + // we still need to go through the following domain Start/Close code + // right now as we have already initialized it when creating the session, + // so we switch to normal mode. + // TODO remove this after we can refactor below code out in this case. + logutil.BgLogger().Info("[upgrade] already upgraded by other nodes, switch to normal mode") + startMode = ddl.Normal + } + } s, err := createSession(store) if err != nil { // Bootstrap fail will cause program exit. logutil.BgLogger().Fatal("createSession error", zap.Error(err)) } dom := domain.GetDomain(s) - err = dom.Start() + err = dom.Start(startMode) if err != nil { // Bootstrap fail will cause program exit. logutil.BgLogger().Fatal("start domain error", zap.Error(err)) } // For the bootstrap SQLs, the following variables should be compatible with old TiDB versions. + // TODO we should have a createBootstrapSession to init those special variables. s.sessionVars.EnableClusteredIndex = variable.ClusteredIndexDefModeIntOnly s.SetValue(sessionctx.Initing, true) - bootstrap(s) + if startMode == ddl.Bootstrap { + bootstrap(s) + } else if startMode == ddl.Upgrade { + // below sleep is used to mitigate https://github.com/pingcap/tidb/issues/57003, + // to let the older owner have time to notice that it's already retired. + time.Sleep(owner.WaitTimeOnForceOwner) + upgrade(s) + } finishBootstrap(store) s.ClearValue(sessionctx.Initing) @@ -3827,29 +3864,33 @@ const ( notBootstrapped = 0 ) -func getStoreBootstrapVersion(store kv.Storage) int64 { - storeBootstrappedLock.Lock() - defer storeBootstrappedLock.Unlock() - // check in memory - _, ok := storeBootstrapped[store.UUID()] - if ok { - return currentBootstrapVersion - } - +func mustGetStoreBootstrapVersion(store kv.Storage) int64 { var ver int64 // check in kv store ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap) err := kv.RunInNewTxn(ctx, store, false, func(_ context.Context, txn kv.Transaction) error { var err error - t := meta.NewMutator(txn) + t := meta.NewReader(txn) ver, err = t.GetBootstrapVersion() return err }) if err != nil { - logutil.BgLogger().Fatal("check bootstrapped failed", - zap.Error(err)) + logutil.BgLogger().Fatal("get store bootstrap version failed", zap.Error(err)) + } + return ver +} + +func getStoreBootstrapVersionWithCache(store kv.Storage) int64 { + storeBootstrappedLock.Lock() + defer storeBootstrappedLock.Unlock() + // check in memory + _, ok := storeBootstrapped[store.UUID()] + if ok { + return currentBootstrapVersion } + ver := mustGetStoreBootstrapVersion(store) + if ver > notBootstrapped { // here mean memory is not ok, but other server has already finished it storeBootstrapped[store.UUID()] = true diff --git a/pkg/session/session_test.go b/pkg/session/session_test.go new file mode 100644 index 0000000000000..1c9bfb95609b4 --- /dev/null +++ b/pkg/session/session_test.go @@ -0,0 +1,29 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package session + +import ( + "testing" + + "github.com/pingcap/tidb/pkg/ddl" + "github.com/stretchr/testify/require" +) + +func TestGetStartMode(t *testing.T) { + require.Equal(t, ddl.Normal, getStartMode(currentBootstrapVersion)) + require.Equal(t, ddl.Normal, getStartMode(currentBootstrapVersion+1)) + require.Equal(t, ddl.Upgrade, getStartMode(currentBootstrapVersion-1)) + require.Equal(t, ddl.Bootstrap, getStartMode(0)) +} diff --git a/pkg/session/tidb.go b/pkg/session/tidb.go index 913d8d2d50b06..f0bc68aba465a 100644 --- a/pkg/session/tidb.go +++ b/pkg/session/tidb.go @@ -125,6 +125,8 @@ var ( domains: map[string]*domain.Domain{}, } // store.UUID()-> IfBootstrapped + // TODO this memory flag is meaningless, a store is always bootstrapped once, + // we can always get the version from the store, remove it later. storeBootstrapped = make(map[string]bool) storeBootstrappedLock sync.Mutex