Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/tso: add go trace information for tso allocation #6872

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"path"
"runtime/trace"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -1067,7 +1068,8 @@ func (am *AllocatorManager) deleteAllocatorGroup(dcLocation string) {
}

// HandleRequest forwards TSO allocation requests to correct TSO Allocators.
func (am *AllocatorManager) HandleRequest(dcLocation string, count uint32) (pdpb.Timestamp, error) {
func (am *AllocatorManager) HandleRequest(ctx context.Context, dcLocation string, count uint32) (pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "AllocatorManager.HandleRequest").End()
if dcLocation == "" {
dcLocation = GlobalDCLocation
}
Expand All @@ -1077,7 +1079,7 @@ func (am *AllocatorManager) HandleRequest(dcLocation string, count uint32) (pdpb
return pdpb.Timestamp{}, err
}

return allocatorGroup.allocator.GenerateTSO(count)
return allocatorGroup.allocator.GenerateTSO(ctx, count)
}

// ResetAllocatorGroup will reset the allocator's leadership and TSO initialized in memory.
Expand Down
23 changes: 14 additions & 9 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"runtime/trace"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -52,7 +53,7 @@ type Allocator interface {
SetTSO(tso uint64, ignoreSmaller, skipUpperBoundCheck bool) error
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
GenerateTSO(count uint32) (pdpb.Timestamp, error)
GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error)
// Reset is used to reset the TSO allocator.
Reset()
}
Expand Down Expand Up @@ -127,8 +128,8 @@ func (gta *GlobalTSOAllocator) getSyncRTT() int64 {
return syncRTT.(int64)
}

func (gta *GlobalTSOAllocator) estimateMaxTS(count uint32, suffixBits int) (*pdpb.Timestamp, bool, error) {
physical, logical, lastUpdateTime := gta.timestampOracle.generateTSO(int64(count), 0)
func (gta *GlobalTSOAllocator) estimateMaxTS(ctx context.Context, count uint32, suffixBits int) (*pdpb.Timestamp, bool, error) {
physical, logical, lastUpdateTime := gta.timestampOracle.generateTSO(ctx, int64(count), 0)
if physical == 0 {
return &pdpb.Timestamp{}, false, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized")
}
Expand Down Expand Up @@ -178,7 +179,8 @@ func (gta *GlobalTSOAllocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundC
// 1. Collect the max Local TSO from all Local TSO Allocator leaders and write it back to them as MaxTS.
// 2. Estimate a MaxTS and try to write it to all Local TSO Allocator leaders directly to reduce the RTT.
// During the process, if the estimated MaxTS is not accurate, it will fallback to the collecting way.
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) {
func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "GlobalTSOAllocator.GenerateTSO").End()
if !gta.member.GetLeadership().Check() {
tsoCounter.WithLabelValues("not_leader", gta.timestampOracle.dcLocation).Inc()
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested pd %s of cluster", errs.NotLeaderErr))
Expand All @@ -188,9 +190,10 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error)
// No dc-locations configured in the cluster, use the normal Global TSO generation way.
// (without synchronization with other Local TSO Allocators)
if len(dcLocationMap) == 0 {
return gta.timestampOracle.getTS(gta.member.GetLeadership(), count, 0)
return gta.timestampOracle.getTS(ctx, gta.member.GetLeadership(), count, 0)
}

ctx1 := ctx
// Have dc-locations configured in the cluster, use the Global TSO generation way.
// (whit synchronization with other Local TSO Allocators)
ctx, cancel := context.WithCancel(gta.ctx)
Expand All @@ -205,7 +208,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error)
)
// TODO: add a switch to control whether to enable the MaxTSO estimation.
// 1. Estimate a MaxTS among all Local TSO Allocator leaders according to the RTT.
estimatedMaxTSO, shouldRetry, err = gta.estimateMaxTS(count, suffixBits)
estimatedMaxTSO, shouldRetry, err = gta.estimateMaxTS(ctx1, count, suffixBits)
if err != nil {
log.Error("global tso allocator estimates MaxTS failed", errs.ZapError(err))
continue
Expand Down Expand Up @@ -243,14 +246,14 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error)
}
// 4. Persist MaxTS into memory, and etcd if needed
var currentGlobalTSO *pdpb.Timestamp
if currentGlobalTSO, err = gta.getCurrentTSO(); err != nil {
if currentGlobalTSO, err = gta.getCurrentTSO(ctx1); err != nil {
log.Error("global tso allocator gets the current global tso in memory failed", errs.ZapError(err))
continue
}
if tsoutil.CompareTimestamp(currentGlobalTSO, &globalTSOResp) < 0 {
tsoCounter.WithLabelValues("global_tso_persist", gta.timestampOracle.dcLocation).Inc()
// Update the Global TSO in memory
if err = gta.timestampOracle.resetUserTimestamp(gta.member.GetLeadership(), tsoutil.GenerateTS(&globalTSOResp), true); err != nil {
if err = gta.timestampOracle.resetUserTimestamp(ctx1, gta.member.GetLeadership(), tsoutil.GenerateTS(&globalTSOResp), true); err != nil {
tsoCounter.WithLabelValues("global_tso_persist_err", gta.timestampOracle.dcLocation).Inc()
log.Error("global tso allocator update the global tso in memory failed", errs.ZapError(err))
continue
Expand Down Expand Up @@ -317,6 +320,7 @@ func (gta *GlobalTSOAllocator) SyncMaxTS(
maxTSO *pdpb.Timestamp,
skipCheck bool,
) error {
defer trace.StartRegion(ctx, "GlobalTSOAllocator.SyncMaxTS").End()
originalMaxTSO := *maxTSO
for i := 0; i < syncMaxRetryCount; i++ {
// Collect all allocator leaders' client URLs
Expand Down Expand Up @@ -460,7 +464,8 @@ func (gta *GlobalTSOAllocator) checkSyncedDCs(dcLocationMap map[string]DCLocatio
return len(unsyncedDCs) == 0, unsyncedDCs
}

func (gta *GlobalTSOAllocator) getCurrentTSO() (*pdpb.Timestamp, error) {
func (gta *GlobalTSOAllocator) getCurrentTSO(ctx context.Context) (*pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "GlobalTSOAllocator.getCurrentTSO").End()
currentPhysical, currentLogical := gta.timestampOracle.getTSO()
if currentPhysical == typeutil.ZeroTime {
return &pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized")
Expand Down
6 changes: 3 additions & 3 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest(
if err != nil {
return pdpb.Timestamp{}, currentKeyspaceGroupID, err
}
ts, err = am.HandleRequest(dcLocation, count)
ts, err = am.HandleRequest(context.Background(), dcLocation, count)
return ts, keyspaceGroupID, err
}

Expand Down Expand Up @@ -791,11 +791,11 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
if err != nil {
return err
}
splitTSO, err := splitAllocator.GenerateTSO(1)
splitTSO, err := splitAllocator.GenerateTSO(context.Background(), 1)
if err != nil {
return err
}
splitSourceTSO, err := splitSourceAllocator.GenerateTSO(1)
splitSourceTSO, err := splitSourceAllocator.GenerateTSO(context.Background(), 1)
if err != nil {
return err
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"path"
"runtime/trace"
"sync/atomic"
"time"

Expand Down Expand Up @@ -103,13 +104,14 @@ func (lta *LocalTSOAllocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundCh

// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (lta *LocalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) {
func (lta *LocalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "LocalTSOAllocator.GenerateTSO").End()
if !lta.leadership.Check() {
tsoCounter.WithLabelValues("not_leader", lta.timestampOracle.dcLocation).Inc()
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(
fmt.Sprintf("requested pd %s of %s allocator", errs.NotLeaderErr, lta.timestampOracle.dcLocation))
}
return lta.timestampOracle.getTS(lta.leadership, count, lta.allocatorManager.GetSuffixBits())
return lta.timestampOracle.getTS(ctx, lta.leadership, count, lta.allocatorManager.GetSuffixBits())
}

// Reset is used to reset the TSO allocator.
Expand Down Expand Up @@ -161,7 +163,7 @@ func (lta *LocalTSOAllocator) WriteTSO(maxTS *pdpb.Timestamp) error {
if tsoutil.CompareTimestamp(currentTSO, maxTS) >= 0 {
return nil
}
return lta.timestampOracle.resetUserTimestamp(lta.leadership, tsoutil.GenerateTS(maxTS), true)
return lta.timestampOracle.resetUserTimestamp(context.Background(), lta.leadership, tsoutil.GenerateTS(maxTS), true)
}

// EnableAllocatorLeader sets the Local TSO Allocator itself to a leader.
Expand Down
13 changes: 9 additions & 4 deletions pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package tso

import (
"context"
"fmt"
"path"
"runtime/trace"
"sync/atomic"
"time"

Expand Down Expand Up @@ -106,7 +108,8 @@ func (t *timestampOracle) getTSO() (time.Time, int64) {
}

// generateTSO will add the TSO's logical part with the given count and returns the new TSO result.
func (t *timestampOracle) generateTSO(count int64, suffixBits int) (physical int64, logical int64, lastUpdateTime time.Time) {
func (t *timestampOracle) generateTSO(ctx context.Context, count int64, suffixBits int) (physical int64, logical int64, lastUpdateTime time.Time) {
defer trace.StartRegion(ctx, "timestampOracle.generateTSO").End()
t.tsoMux.Lock()
defer t.tsoMux.Unlock()
if t.tsoMux.physical == typeutil.ZeroTime {
Expand Down Expand Up @@ -201,7 +204,8 @@ func (t *timestampOracle) isInitialized() bool {
// When ignoreSmaller is true, resetUserTimestamp will ignore the smaller tso resetting error and do nothing.
// It's used to write MaxTS during the Global TSO synchronization without failing the writing as much as possible.
// cannot set timestamp to one which >= current + maxResetTSGap
func (t *timestampOracle) resetUserTimestamp(leadership *election.Leadership, tso uint64, ignoreSmaller bool) error {
func (t *timestampOracle) resetUserTimestamp(ctx context.Context, leadership *election.Leadership, tso uint64, ignoreSmaller bool) error {
defer trace.StartRegion(ctx, "timestampOracle.resetUserTimestamp").End()
return t.resetUserTimestampInner(leadership, tso, ignoreSmaller, false)
}

Expand Down Expand Up @@ -328,7 +332,8 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error
var maxRetryCount = 10

// getTS is used to get a timestamp.
func (t *timestampOracle) getTS(leadership *election.Leadership, count uint32, suffixBits int) (pdpb.Timestamp, error) {
func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leadership, count uint32, suffixBits int) (pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "timestampOracle.getTS").End()
var resp pdpb.Timestamp
if count == 0 {
return resp, errs.ErrGenerateTimestamp.FastGenByArgs("tso count should be positive")
Expand All @@ -345,7 +350,7 @@ func (t *timestampOracle) getTS(leadership *election.Leadership, count uint32, s
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized")
}
// Get a new TSO result with the given count
resp.Physical, resp.Logical, _ = t.generateTSO(int64(count), suffixBits)
resp.Physical, resp.Logical, _ = t.generateTSO(ctx, int64(count), suffixBits)
if resp.GetPhysical() == 0 {
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory has been reset")
}
Expand Down
9 changes: 6 additions & 3 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"
"path"
"runtime/trace"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -237,7 +238,9 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}
count := request.GetCount()
ts, err := s.tsoAllocatorManager.HandleRequest(request.GetDcLocation(), count)
ctx, task := trace.NewTask(ctx, "Tso")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about "tso"?

ts, err := s.tsoAllocatorManager.HandleRequest(ctx, request.GetDcLocation(), count)
task.End()
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
Expand Down Expand Up @@ -1330,7 +1333,7 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb
if s.IsAPIServiceMode() {
nowTSO, err = s.getGlobalTSOFromTSOServer(ctx)
} else {
nowTSO, err = s.tsoAllocatorManager.HandleRequest(tso.GlobalDCLocation, 1)
nowTSO, err = s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -2035,7 +2038,7 @@ func (s *GrpcServer) SetExternalTimestamp(ctx context.Context, request *pdpb.Set
if s.IsAPIServiceMode() {
nowTSO, err = s.getGlobalTSOFromTSOServer(ctx)
} else {
nowTSO, err = s.tsoAllocatorManager.HandleRequest(tso.GlobalDCLocation, 1)
nowTSO, err = s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1)
}
if err != nil {
return nil, err
Expand Down
Loading