From 444b0dbb6d208f8da7552efe5ac0ed2f3d1a3d57 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Mon, 2 Dec 2024 18:06:06 +0800 Subject: [PATCH 1/5] remove local tso Signed-off-by: okJiang <819421878@qq.com> --- client/client.go | 4 +-- client/clients/tso/tso_client.go | 8 ++--- pkg/mcs/tso/server/config.go | 5 +-- pkg/mcs/tso/server/config_test.go | 5 --- pkg/mcs/tso/server/server.go | 9 ----- pkg/storage/endpoint/tso.go | 6 ++-- pkg/tso/allocator_manager.go | 14 -------- pkg/tso/global_allocator.go | 20 ++--------- pkg/tso/keyspace_group_manager.go | 6 ++-- pkg/tso/keyspace_group_manager_test.go | 1 - pkg/tso/testutil.go | 1 - pkg/tso/tso.go | 34 +++---------------- pkg/utils/keypath/key_path.go | 14 -------- server/config/config.go | 7 +--- server/server.go | 5 --- tests/cluster.go | 5 --- .../mcs/tso/keyspace_group_manager_test.go | 2 -- 17 files changed, 19 insertions(+), 127 deletions(-) diff --git a/client/client.go b/client/client.go index bf982f4fea0..99ea8c76980 100644 --- a/client/client.go +++ b/client/client.go @@ -568,7 +568,7 @@ func (c *client) GetTSAsync(ctx context.Context) tso.TSFuture { // GetLocalTSAsync implements the TSOClient interface. // -// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the +// Deprecated: Currently, regardless of the // parameters passed in, this method will default to returning the global TSO. func (c *client) GetLocalTSAsync(ctx context.Context, _ string) tso.TSFuture { return c.GetTSAsync(ctx) @@ -582,7 +582,7 @@ func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err // GetLocalTS implements the TSOClient interface. // -// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the +// Deprecated: Currently, regardless of the // parameters passed in, this method will default to returning the global TSO. func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logical int64, err error) { return c.GetTS(ctx) diff --git a/client/clients/tso/tso_client.go b/client/clients/tso/tso_client.go index 9c7075fe3bb..9fe7d65ddaa 100644 --- a/client/clients/tso/tso_client.go +++ b/client/clients/tso/tso_client.go @@ -58,13 +58,13 @@ type Client interface { // GetLocalTS gets a local timestamp from PD or TSO microservice. // - // Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the - // parameters passed in, this method will default to returning the global TSO. + // Deprecated: Currently, regardless of the parameters passed in, this + // method will default to returning the global TSO. GetLocalTS(ctx context.Context, _ string) (int64, int64, error) // GetLocalTSAsync gets a local timestamp from PD or TSO microservice, without block the caller. // - // Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the - // parameters passed in, this method will default to returning the global TSO. + // Deprecated: Currently, regardless of the parameters passed in, this + // method will default to returning the global TSO. GetLocalTSAsync(ctx context.Context, _ string) TSFuture } diff --git a/pkg/mcs/tso/server/config.go b/pkg/mcs/tso/server/config.go index 0973042b912..2aaa54114da 100644 --- a/pkg/mcs/tso/server/config.go +++ b/pkg/mcs/tso/server/config.go @@ -64,10 +64,7 @@ type Config struct { // the primary/leader again. Etcd only supports seconds TTL, so here is second too. LeaderLease int64 `toml:"lease" json:"lease"` - // EnableLocalTSO is used to enable the Local TSO Allocator feature, - // which allows the PD server to generate Local TSO for certain DC-level transactions. - // To make this feature meaningful, user has to set the "zone" label for the PD server - // to indicate which DC this PD belongs to. + // Deprecated EnableLocalTSO bool `toml:"enable-local-tso" json:"enable-local-tso"` // TSOSaveInterval is the interval to save timestamp. diff --git a/pkg/mcs/tso/server/config_test.go b/pkg/mcs/tso/server/config_test.go index 2bd27a67492..2bafec30aa9 100644 --- a/pkg/mcs/tso/server/config_test.go +++ b/pkg/mcs/tso/server/config_test.go @@ -36,7 +36,6 @@ func TestConfigBasic(t *testing.T) { re.Equal(defaultBackendEndpoints, cfg.BackendEndpoints) re.Equal(defaultListenAddr, cfg.ListenAddr) re.Equal(constant.DefaultLeaderLease, cfg.LeaderLease) - re.False(cfg.EnableLocalTSO) re.True(cfg.EnableGRPCGateway) re.Equal(defaultTSOSaveInterval, cfg.TSOSaveInterval.Duration) re.Equal(defaultTSOUpdatePhysicalInterval, cfg.TSOUpdatePhysicalInterval.Duration) @@ -48,7 +47,6 @@ func TestConfigBasic(t *testing.T) { cfg.ListenAddr = "test-listen-addr" cfg.AdvertiseListenAddr = "test-advertise-listen-addr" cfg.LeaderLease = 123 - cfg.EnableLocalTSO = true cfg.TSOSaveInterval.Duration = time.Duration(10) * time.Second cfg.TSOUpdatePhysicalInterval.Duration = time.Duration(100) * time.Millisecond cfg.MaxResetTSGap.Duration = time.Duration(1) * time.Hour @@ -58,7 +56,6 @@ func TestConfigBasic(t *testing.T) { re.Equal("test-listen-addr", cfg.GetListenAddr()) re.Equal("test-advertise-listen-addr", cfg.GetAdvertiseListenAddr()) re.Equal(int64(123), cfg.GetLeaderLease()) - re.True(cfg.EnableLocalTSO) re.Equal(time.Duration(10)*time.Second, cfg.TSOSaveInterval.Duration) re.Equal(time.Duration(100)*time.Millisecond, cfg.TSOUpdatePhysicalInterval.Duration) re.Equal(time.Duration(1)*time.Hour, cfg.MaxResetTSGap.Duration) @@ -74,7 +71,6 @@ name = "tso-test-name" data-dir = "/var/lib/tso" enable-grpc-gateway = false lease = 123 -enable-local-tso = true tso-save-interval = "10s" tso-update-physical-interval = "100ms" max-gap-reset-ts = "1h" @@ -92,7 +88,6 @@ max-gap-reset-ts = "1h" re.Equal("test-advertise-listen-addr", cfg.GetAdvertiseListenAddr()) re.Equal("/var/lib/tso", cfg.DataDir) re.Equal(int64(123), cfg.GetLeaderLease()) - re.True(cfg.EnableLocalTSO) re.Equal(time.Duration(10)*time.Second, cfg.TSOSaveInterval.Duration) re.Equal(time.Duration(100)*time.Millisecond, cfg.TSOUpdatePhysicalInterval.Duration) re.Equal(time.Duration(1)*time.Hour, cfg.MaxResetTSGap.Duration) diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 04e81c2d48e..d2974075e94 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -272,15 +272,6 @@ func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorM return s.keyspaceGroupManager.GetAllocatorManager(keyspaceGroupID) } -// IsLocalRequest checks if the forwarded host is the current host -func (*Server) IsLocalRequest(forwardedHost string) bool { - // TODO: Check if the forwarded host is the current host. - // The logic is depending on etcd service mode -- if the TSO service - // uses the embedded etcd, check against ClientUrls; otherwise check - // against the cluster membership. - return forwardedHost == "" -} - // ValidateInternalRequest checks if server is closed, which is used to validate // the gRPC communication between TSO servers internally. // TODO: Check if the sender is from the global TSO allocator diff --git a/pkg/storage/endpoint/tso.go b/pkg/storage/endpoint/tso.go index a656f6d2945..77841529e98 100644 --- a/pkg/storage/endpoint/tso.go +++ b/pkg/storage/endpoint/tso.go @@ -37,9 +37,9 @@ type TSOStorage interface { var _ TSOStorage = (*StorageEndpoint)(nil) -// LoadTimestamp will get all time windows of Local/Global TSOs from etcd and return the biggest one. -// For the Global TSO, loadTimestamp will get all Local and Global TSO time windows persisted in etcd and choose the biggest one. -// For the Local TSO, loadTimestamp will only get its own dc-location time window persisted before. +// LoadTimestamp will get all time windows of Global TSOs from etcd and return the biggest one. +// TODO: Due to local TSO is deprecated, maybe we do not need to load timestamp +// by prefix, we can just load the timestamp by the key. func (se *StorageEndpoint) LoadTimestamp(prefix string) (time.Time, error) { prefixEnd := clientv3.GetPrefixRangeEnd(prefix) keys, values, err := se.LoadRange(prefix, prefixEnd, 0) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 8d5589143aa..467946e0729 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -17,7 +17,6 @@ package tso import ( "context" "math" - "path" "runtime/trace" "strconv" "sync" @@ -43,8 +42,6 @@ const ( checkStep = time.Minute patrolStep = time.Second defaultAllocatorLeaderLease = 3 - localTSOAllocatorEtcdPrefix = "lta" - localTSOSuffixEtcdPrefix = "lts" ) var ( @@ -217,17 +214,6 @@ func (am *AllocatorManager) getGroupIDStr() string { return strconv.FormatUint(uint64(am.kgID), 10) } -// GetTimestampPath returns the timestamp path in etcd. -func (am *AllocatorManager) GetTimestampPath() string { - if am == nil { - return "" - } - - am.mu.RLock() - defer am.mu.RUnlock() - return path.Join(am.rootPath, am.mu.allocatorGroup.allocator.GetTimestampPath()) -} - // tsoAllocatorLoop is used to run the TSO Allocator updating daemon. func (am *AllocatorManager) tsoAllocatorLoop() { defer logutil.LogPanic() diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 52c30c38f1e..c62b49a603c 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -47,14 +47,6 @@ type Allocator interface { IsInitialize() bool // UpdateTSO is used to update the TSO in memory and the time window in etcd. UpdateTSO() error - // GetTimestampPath returns the timestamp path in etcd, which is: - // 1. for the default keyspace group: - // a. timestamp in /pd/{cluster_id}/timestamp - // b. lta/{dc-location}/timestamp in /pd/{cluster_id}/lta/{dc-location}/timestamp - // 1. for the non-default keyspace groups: - // a. {group}/gts/timestamp in /ms/{cluster_id}/tso/{group}/gta/timestamp - // b. {group}/lts/{dc-location}/timestamp in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp - GetTimestampPath() string // SetTSO sets the physical part with given TSO. It's mainly used for BR restore. // Cannot set the TSO smaller than now in any case. // if ignoreSmaller=true, if input ts is smaller than current, ignore silently, else return error @@ -68,6 +60,8 @@ type Allocator interface { } // GlobalTSOAllocator is the global single point TSO allocator. +// TODO: Local TSO allocator is deprecated now, we can update the name to +// TSOAllocator and remove the `Global` concept. type GlobalTSOAllocator struct { ctx context.Context cancel context.CancelFunc @@ -132,19 +126,9 @@ func (gta *GlobalTSOAllocator) getGroupID() uint32 { return gta.am.getGroupID() } -// GetTimestampPath returns the timestamp path in etcd. -func (gta *GlobalTSOAllocator) GetTimestampPath() string { - if gta == nil || gta.timestampOracle == nil { - return "" - } - return gta.timestampOracle.GetTimestampPath() -} - // Initialize will initialize the created global TSO allocator. func (gta *GlobalTSOAllocator) Initialize(int) error { gta.tsoAllocatorRoleGauge.Set(1) - // The suffix of a Global TSO should always be 0. - gta.timestampOracle.suffix = 0 return gta.timestampOracle.SyncTimestamp() } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 09f20609920..7fe86c03ec8 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -66,7 +66,7 @@ const ( type state struct { syncutil.RWMutex // ams stores the allocator managers of the keyspace groups. Each keyspace group is - // assigned with an allocator manager managing its global/local tso allocators. + // assigned with an allocator manager managing its global tso allocators. // Use a fixed size array to maximize the efficiency of concurrent access to // different keyspace groups for tso service. ams [constant.MaxKeyspaceGroupCountInUse]*AllocatorManager @@ -790,8 +790,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg) am.startGlobalAllocatorLoop() log.Info("created allocator manager", - zap.Uint32("keyspace-group-id", group.ID), - zap.String("timestamp-path", am.GetTimestampPath())) + zap.Uint32("keyspace-group-id", group.ID)) kgm.Lock() group.KeyspaceLookupTable = make(map[uint32]struct{}) for _, kid := range group.Keyspaces { @@ -1514,7 +1513,6 @@ func (kgm *KeyspaceGroupManager) deletedGroupCleaner() { log.Info("delete the keyspace group tso key", zap.Uint32("keyspace-group-id", groupID)) // Clean up the remaining TSO keys. - // TODO: support the Local TSO Allocator clean up. err := kgm.tsoSvcStorage.DeleteTimestamp( keypath.TimestampPath( keypath.KeyspaceGroupGlobalTSPath(groupID), diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index dea0b00f4f0..be3d53785cd 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -87,7 +87,6 @@ func (suite *keyspaceGroupManagerTestSuite) createConfig() *TestServiceConfig { ListenAddr: addr, AdvertiseListenAddr: addr, LeaderLease: constant.DefaultLeaderLease, - LocalTSOEnabled: false, TSOUpdatePhysicalInterval: 50 * time.Millisecond, TSOSaveInterval: time.Duration(constant.DefaultLeaderLease) * time.Second, MaxResetTSGap: time.Hour * 24, diff --git a/pkg/tso/testutil.go b/pkg/tso/testutil.go index e3d04f55813..336d1414d98 100644 --- a/pkg/tso/testutil.go +++ b/pkg/tso/testutil.go @@ -29,7 +29,6 @@ type TestServiceConfig struct { ListenAddr string // Address the service listens on. AdvertiseListenAddr string // Address the service advertises to the clients. LeaderLease int64 // Leader lease. - LocalTSOEnabled bool // Whether local TSO is enabled. TSOUpdatePhysicalInterval time.Duration // Interval to update TSO in physical storage. TSOSaveInterval time.Duration // Interval to save TSO to physical storage. MaxResetTSGap time.Duration // Maximum gap to reset TSO. diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index 38a4c989093..ec81a8fb0dd 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -75,7 +75,6 @@ type timestampOracle struct { tsoMux *tsoObject // last timestamp window stored in etcd lastSavedTime atomic.Value // stored as time.Time - suffix int // pre-initialized metrics metrics *tsoMetrics @@ -116,9 +115,6 @@ func (t *timestampOracle) generateTSO(ctx context.Context, count int64, suffixBi physical = t.tsoMux.physical.UnixNano() / int64(time.Millisecond) t.tsoMux.logical += count logical = t.tsoMux.logical - if suffixBits > 0 && t.suffix >= 0 { - logical = t.calibrateLogical(logical, suffixBits) - } // Return the last update time lastUpdateTime = t.tsoMux.updateTime t.tsoMux.updateTime = time.Now() @@ -133,28 +129,6 @@ func (t *timestampOracle) getLastSavedTime() time.Time { return last.(time.Time) } -// Because the Local TSO in each Local TSO Allocator is independent, so they are possible -// to be the same at sometimes, to avoid this case, we need to use the logical part of the -// Local TSO to do some differentiating work. -// For example, we have three DCs: dc-1, dc-2 and dc-3. The bits of suffix is defined by -// the const suffixBits. Then, for dc-2, the suffix may be 1 because it's persisted -// in etcd with the value of 1. -// Once we get a normal TSO like this (18 bits): xxxxxxxxxxxxxxxxxx. We will make the TSO's -// low bits of logical part from each DC looks like: -// -// global: xxxxxxxxxx00000000 -// dc-1: xxxxxxxxxx00000001 -// dc-2: xxxxxxxxxx00000010 -// dc-3: xxxxxxxxxx00000011 -func (t *timestampOracle) calibrateLogical(rawLogical int64, suffixBits int) int64 { - return rawLogical< 0)) @@ -209,7 +183,7 @@ func (t *timestampOracle) SyncTimestamp() error { }) save := next.Add(t.saveInterval) start := time.Now() - if err = t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { + if err = t.storage.SaveTimestamp(keypath.TimestampPath(t.tsPath), save); err != nil { t.metrics.errSaveSyncTSEvent.Inc() return err } @@ -277,7 +251,7 @@ func (t *timestampOracle) resetUserTimestamp(leadership *election.Leadership, ts if typeutil.SubRealTimeByWallClock(t.getLastSavedTime(), nextPhysical) <= UpdateTimestampGuard { save := nextPhysical.Add(t.saveInterval) start := time.Now() - if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { + if err := t.storage.SaveTimestamp(keypath.TimestampPath(t.tsPath), save); err != nil { t.metrics.errSaveResetTSEvent.Inc() return err } @@ -361,10 +335,10 @@ func (t *timestampOracle) UpdateTimestamp() error { if typeutil.SubRealTimeByWallClock(t.getLastSavedTime(), next) <= UpdateTimestampGuard { save := next.Add(t.saveInterval) start := time.Now() - if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { + if err := t.storage.SaveTimestamp(keypath.TimestampPath(t.tsPath), save); err != nil { log.Warn("save timestamp failed", logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0), - zap.String("timestamp-path", t.GetTimestampPath()), + zap.String("timestamp-path", keypath.TimestampPath(t.tsPath)), zap.Error(err)) t.metrics.errSaveUpdateTSEvent.Inc() return err diff --git a/pkg/utils/keypath/key_path.go b/pkg/utils/keypath/key_path.go index 03d497dc408..3e6ae91b997 100644 --- a/pkg/utils/keypath/key_path.go +++ b/pkg/utils/keypath/key_path.go @@ -400,20 +400,6 @@ func TimestampPath(tsPath string) string { return path.Join(tsPath, TimestampKey) } -// FullTimestampPath returns the full timestamp path. -// 1. for the default keyspace group: -// /pd/{cluster_id}/timestamp -// 2. for the non-default keyspace groups: -// /ms/{cluster_id}/tso/{group}/gta/timestamp -func FullTimestampPath(groupID uint32) string { - rootPath := TSOSvcRootPath() - tsPath := TimestampPath(KeyspaceGroupGlobalTSPath(groupID)) - if groupID == constant.DefaultKeyspaceGroupID { - rootPath = LegacyRootPath() - } - return path.Join(rootPath, tsPath) -} - const ( registryKey = "registry" ) diff --git a/server/config/config.go b/server/config/config.go index 7ad03baab77..a5183f119f2 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -96,10 +96,7 @@ type Config struct { // be automatically clamped to the range. TSOUpdatePhysicalInterval typeutil.Duration `toml:"tso-update-physical-interval" json:"tso-update-physical-interval"` - // EnableLocalTSO is used to enable the Local TSO Allocator feature, - // which allows the PD server to generate Local TSO for certain DC-level transactions. - // To make this feature meaningful, user has to set the "zone" label for the PD server - // to indicate which DC this PD belongs to. + // Deprecated EnableLocalTSO bool `toml:"enable-local-tso" json:"enable-local-tso"` Metric metricutil.MetricConfig `toml:"metric" json:"metric"` @@ -115,8 +112,6 @@ type Config struct { // Labels indicates the labels set for **this** PD server. The labels describe some specific properties // like `zone`/`rack`/`host`. Currently, labels won't affect the PD server except for some special // label keys. Now we have following special keys: - // 1. 'zone' is a special key that indicates the DC location of this PD server. If it is set, the value for this - // will be used to determine which DC's Local TSO service this PD will provide with if EnableLocalTSO is true. Labels map[string]string `toml:"labels" json:"labels"` // QuotaBackendBytes Raise alarms when backend size exceeds the given quota. 0 means use the default quota. diff --git a/server/server.go b/server/server.go index e49496ff8d8..e36ef35a333 100644 --- a/server/server.go +++ b/server/server.go @@ -936,11 +936,6 @@ func (s *Server) GetServiceMiddlewareConfig() *config.ServiceMiddlewareConfig { return cfg } -// SetEnableLocalTSO sets enable-local-tso flag of Server. This function only for test. -func (s *Server) SetEnableLocalTSO(enableLocalTSO bool) { - s.cfg.EnableLocalTSO = enableLocalTSO -} - // GetConfig gets the config information. func (s *Server) GetConfig() *config.Config { cfg := s.cfg.Clone() diff --git a/tests/cluster.go b/tests/cluster.go index bf17f79a87c..21807d2aadc 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -185,11 +185,6 @@ func (s *TestServer) GetConfig() *config.Config { return s.server.GetConfig() } -// SetEnableLocalTSO sets the enable-local-tso flag of the TestServer. -func (s *TestServer) SetEnableLocalTSO(enableLocalTSO bool) { - s.server.SetEnableLocalTSO(enableLocalTSO) -} - // GetPersistOptions returns the current TestServer's schedule option. func (s *TestServer) GetPersistOptions() *config.PersistOptions { s.RLock() diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 51f3fd37295..75d74168311 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -221,8 +221,6 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe // for primary election. rootPath := keypath.TSOSvcRootPath() primaryPath := keypath.KeyspaceGroupPrimaryPath(rootPath, param.keyspaceGroupID) - timestampPath := keypath.FullTimestampPath(param.keyspaceGroupID) - re.Equal(timestampPath, am.GetTimestampPath()) re.Equal(primaryPath, am.GetMember().GetLeaderPath()) served = true From eedb168da8565d947f300606977059caf8e895ba Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Tue, 3 Dec 2024 14:08:08 +0800 Subject: [PATCH 2/5] fix ci Signed-off-by: okJiang <819421878@qq.com> --- pkg/tso/allocator_manager.go | 16 ---------------- pkg/tso/global_allocator.go | 2 +- pkg/tso/tso.go | 24 +++++++----------------- pkg/utils/tsoutil/tso_dispatcher.go | 14 +++++++------- pkg/utils/tsoutil/tso_request.go | 16 +++++++--------- 5 files changed, 22 insertions(+), 50 deletions(-) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 467946e0729..65f61e819d1 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -16,7 +16,6 @@ package tso import ( "context" - "math" "runtime/trace" "strconv" "sync" @@ -240,21 +239,6 @@ func (am *AllocatorManager) GetMember() ElectionMember { return am.member } -// GetSuffixBits calculates the bits of suffix sign -// by the max number of suffix so far, -// which will be used in the TSO logical part. -func (am *AllocatorManager) GetSuffixBits() int { - am.mu.RLock() - defer am.mu.RUnlock() - return CalSuffixBits(am.mu.maxSuffix) -} - -// CalSuffixBits calculates the bits of suffix by the max suffix sign. -func CalSuffixBits(maxSuffix int32) int { - // maxSuffix + 1 because we have the Global TSO holds 0 as the suffix sign - return int(math.Ceil(math.Log2(float64(maxSuffix + 1)))) -} - // AllocatorDaemon is used to update every allocator's TSO and check whether we have // any new local allocator that needs to be set up. func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) { diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 467dcb03f87..2fe0df3e000 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -159,7 +159,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested pd %s of cluster", errs.NotLeaderErr)) } - return gta.timestampOracle.getTS(ctx, gta.member.GetLeadership(), count, 0) + return gta.timestampOracle.getTS(ctx, gta.member.GetLeadership(), count) } // Reset is used to reset the TSO allocator. diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index ec81a8fb0dd..0210c98626b 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -43,8 +43,6 @@ const ( // When a TSO's logical time reaches this limit, // the physical time will be forced to increase. maxLogical = int64(1 << 18) - // MaxSuffixBits indicates the max number of suffix bits. - MaxSuffixBits = 4 // jetLagWarningThreshold is the warning threshold of jetLag in `timestampOracle.UpdateTimestamp`. // In case of small `updatePhysicalInterval`, the `3 * updatePhysicalInterval` would also is small, // and trigger unnecessary warnings about clock offset. @@ -55,9 +53,8 @@ const ( // tsoObject is used to store the current TSO in memory with a RWMutex lock. type tsoObject struct { syncutil.RWMutex - physical time.Time - logical int64 - updateTime time.Time + physical time.Time + logical int64 } // timestampOracle is used to maintain the logic of TSO. @@ -91,7 +88,6 @@ func (t *timestampOracle) setTSOPhysical(next time.Time, force bool) { if typeutil.SubTSOPhysicalByWallClock(next, t.tsoMux.physical) > 0 { t.tsoMux.physical = next t.tsoMux.logical = 0 - t.tsoMux.updateTime = time.Now() } } @@ -105,20 +101,17 @@ func (t *timestampOracle) getTSO() (time.Time, int64) { } // generateTSO will add the TSO's logical part with the given count and returns the new TSO result. -func (t *timestampOracle) generateTSO(ctx context.Context, count int64, suffixBits int) (physical int64, logical int64, lastUpdateTime time.Time) { +func (t *timestampOracle) generateTSO(ctx context.Context, count int64) (physical int64, logical int64) { defer trace.StartRegion(ctx, "timestampOracle.generateTSO").End() t.tsoMux.Lock() defer t.tsoMux.Unlock() if t.tsoMux.physical == typeutil.ZeroTime { - return 0, 0, typeutil.ZeroTime + return 0, 0 } physical = t.tsoMux.physical.UnixNano() / int64(time.Millisecond) t.tsoMux.logical += count logical = t.tsoMux.logical - // Return the last update time - lastUpdateTime = t.tsoMux.updateTime - t.tsoMux.updateTime = time.Now() - return physical, logical, lastUpdateTime + return physical, logical } func (t *timestampOracle) getLastSavedTime() time.Time { @@ -261,7 +254,6 @@ func (t *timestampOracle) resetUserTimestamp(leadership *election.Leadership, ts // save into memory only if nextPhysical or nextLogical is greater. t.tsoMux.physical = nextPhysical t.tsoMux.logical = int64(nextLogical) - t.tsoMux.updateTime = time.Now() t.metrics.resetTSOOKEvent.Inc() return nil } @@ -355,7 +347,7 @@ func (t *timestampOracle) UpdateTimestamp() error { var maxRetryCount = 10 // getTS is used to get a timestamp. -func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leadership, count uint32, suffixBits int) (pdpb.Timestamp, error) { +func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leadership, count uint32) (pdpb.Timestamp, error) { defer trace.StartRegion(ctx, "timestampOracle.getTS").End() var resp pdpb.Timestamp if count == 0 { @@ -373,7 +365,7 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized") } // Get a new TSO result with the given count - resp.Physical, resp.Logical, _ = t.generateTSO(ctx, int64(count), suffixBits) + resp.Physical, resp.Logical = t.generateTSO(ctx, int64(count)) if resp.GetPhysical() == 0 { return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory has been reset") } @@ -390,7 +382,6 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader if !leadership.Check() { return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested %s anymore", errs.NotLeaderErr)) } - resp.SuffixBits = uint32(suffixBits) return resp, nil } t.metrics.exceededMaxRetryEvent.Inc() @@ -404,6 +395,5 @@ func (t *timestampOracle) ResetTimestamp() { log.Info("reset the timestamp in memory", logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0)) t.tsoMux.physical = typeutil.ZeroTime t.tsoMux.logical = 0 - t.tsoMux.updateTime = typeutil.ZeroTime t.lastSavedTime.Store(typeutil.ZeroTime) } diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index c4aa96274e1..86afe75e6c5 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -171,23 +171,23 @@ func (s *TSODispatcher) processRequests(forwardStream stream, requests []Request s.tsoProxyBatchSize.Observe(float64(count)) // Split the response ts := resp.GetTimestamp() - physical, logical, suffixBits := ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() + physical, logical := ts.GetPhysical(), ts.GetLogical() // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. // This is different from the logic of client batch, for example, if we have a largest ts whose logical part is 10, // count is 5, then the splitting results should be 5 and 10. - firstLogical := addLogical(logical, -int64(count), suffixBits) - return s.finishRequest(requests, physical, firstLogical, suffixBits) + firstLogical := addLogical(logical, -int64(count)) + return s.finishRequest(requests, physical, firstLogical) } // Because of the suffix, we need to shift the count before we add it to the logical part. -func addLogical(logical, count int64, suffixBits uint32) int64 { - return logical + count< Date: Tue, 3 Dec 2024 15:45:24 +0800 Subject: [PATCH 3/5] fix typo Signed-off-by: okJiang <819421878@qq.com> --- pkg/storage/endpoint/service_middleware.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storage/endpoint/service_middleware.go b/pkg/storage/endpoint/service_middleware.go index 3859dab4d62..35f0606f9d0 100644 --- a/pkg/storage/endpoint/service_middleware.go +++ b/pkg/storage/endpoint/service_middleware.go @@ -29,7 +29,7 @@ type ServiceMiddlewareStorage interface { var _ ServiceMiddlewareStorage = (*StorageEndpoint)(nil) -// LoadServiceMiddlewareConfig loads service middleware config from keypath.KeyspaceGroupLocalTSPath then unmarshal it to cfg. +// LoadServiceMiddlewareConfig loads service middleware config from ServiceMiddlewarePath then unmarshal it to cfg. func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg any) (bool, error) { value, err := se.Load(keypath.ServiceMiddlewarePath) if err != nil || value == "" { @@ -42,7 +42,7 @@ func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg any) (bool, error) { return true, nil } -// SaveServiceMiddlewareConfig stores marshallable cfg to the keypath.KeyspaceGroupLocalTSPath. +// SaveServiceMiddlewareConfig stores marshallable cfg to the ServiceMiddlewarePath. func (se *StorageEndpoint) SaveServiceMiddlewareConfig(cfg any) error { return se.saveJSON(keypath.ServiceMiddlewarePath, cfg) } From fe0f17b048b0f2da9b15dba961aeb3e79d16bfcc Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Tue, 10 Dec 2024 10:38:13 +0800 Subject: [PATCH 4/5] fix comment Signed-off-by: okJiang <819421878@qq.com> --- client/client.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/client/client.go b/client/client.go index 99ea8c76980..a796b94ce28 100644 --- a/client/client.go +++ b/client/client.go @@ -568,8 +568,10 @@ func (c *client) GetTSAsync(ctx context.Context) tso.TSFuture { // GetLocalTSAsync implements the TSOClient interface. // -// Deprecated: Currently, regardless of the -// parameters passed in, this method will default to returning the global TSO. +// Deprecated: the Local TSO feature has been deprecated. Regardless of the +// parameters passed, the behavior of this interface will be equivalent to +// `GetTSAsync/GetTS`. If you want to use a separately deployed TSO service, +// please refer to the deployment of the TSO microservice. func (c *client) GetLocalTSAsync(ctx context.Context, _ string) tso.TSFuture { return c.GetTSAsync(ctx) } @@ -582,8 +584,10 @@ func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err // GetLocalTS implements the TSOClient interface. // -// Deprecated: Currently, regardless of the -// parameters passed in, this method will default to returning the global TSO. +// Deprecated: the Local TSO feature has been deprecated. Regardless of the +// parameters passed, the behavior of this interface will be equivalent to +// `GetTSAsync/GetTS`. If you want to use a separately deployed TSO service, +// please refer to the deployment of the TSO microservice. func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logical int64, err error) { return c.GetTS(ctx) } From cf0d46addf42d79b5eb95fe5e85fab8ddd014c1f Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Wed, 11 Dec 2024 11:25:49 +0800 Subject: [PATCH 5/5] modify comment Signed-off-by: okJiang <819421878@qq.com> --- client/client.go | 8 ++------ client/clients/tso/tso_client.go | 16 ++++++++-------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/client/client.go b/client/client.go index 7a8dc19c9ea..002213e2a76 100644 --- a/client/client.go +++ b/client/client.go @@ -501,11 +501,9 @@ func (c *client) GetTSAsync(ctx context.Context) tso.TSFuture { return c.inner.dispatchTSORequestWithRetry(ctx) } -// GetLocalTSAsync implements the TSOClient interface. -// // Deprecated: the Local TSO feature has been deprecated. Regardless of the // parameters passed, the behavior of this interface will be equivalent to -// `GetTSAsync/GetTS`. If you want to use a separately deployed TSO service, +// `GetTSAsync`. If you want to use a separately deployed TSO service, // please refer to the deployment of the TSO microservice. func (c *client) GetLocalTSAsync(ctx context.Context, _ string) tso.TSFuture { return c.GetTSAsync(ctx) @@ -517,11 +515,9 @@ func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err return resp.Wait() } -// GetLocalTS implements the TSOClient interface. -// // Deprecated: the Local TSO feature has been deprecated. Regardless of the // parameters passed, the behavior of this interface will be equivalent to -// `GetTSAsync/GetTS`. If you want to use a separately deployed TSO service, +// `GetTS`. If you want to use a separately deployed TSO service, // please refer to the deployment of the TSO microservice. func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logical int64, err error) { return c.GetTS(ctx) diff --git a/client/clients/tso/tso_client.go b/client/clients/tso/tso_client.go index 9fe7d65ddaa..68e2163d191 100644 --- a/client/clients/tso/tso_client.go +++ b/client/clients/tso/tso_client.go @@ -56,15 +56,15 @@ type Client interface { // the TSO microservice. GetMinTS(ctx context.Context) (int64, int64, error) - // GetLocalTS gets a local timestamp from PD or TSO microservice. - // - // Deprecated: Currently, regardless of the parameters passed in, this - // method will default to returning the global TSO. + // Deprecated: the Local TSO feature has been deprecated. Regardless of the + // parameters passed, the behavior of this interface will be equivalent to + // `GetTS`. If you want to use a separately deployed TSO service, + // please refer to the deployment of the TSO microservice. GetLocalTS(ctx context.Context, _ string) (int64, int64, error) - // GetLocalTSAsync gets a local timestamp from PD or TSO microservice, without block the caller. - // - // Deprecated: Currently, regardless of the parameters passed in, this - // method will default to returning the global TSO. + // Deprecated: the Local TSO feature has been deprecated. Regardless of the + // parameters passed, the behavior of this interface will be equivalent to + // `GetTSAsync`. If you want to use a separately deployed TSO service, + // please refer to the deployment of the TSO microservice. GetLocalTSAsync(ctx context.Context, _ string) TSFuture }