Skip to content

Commit

Permalink
Merge branch 'master' into metrics3
Browse files Browse the repository at this point in the history
  • Loading branch information
HuSharp authored Mar 6, 2024
2 parents ac89811 + 6a45734 commit a750fe8
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 109 deletions.
64 changes: 32 additions & 32 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,10 +773,10 @@ func (c *client) GetTSAsync(ctx context.Context) TSFuture {
}

func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture {
defer trace.StartRegion(ctx, "GetLocalTSAsync").End()
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("GetLocalTSAsync", opentracing.ChildOf(span.Context()))
ctx = opentracing.ContextWithSpan(ctx, span)
defer trace.StartRegion(ctx, "pdclient.GetLocalTSAsync").End()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetLocalTSAsync", opentracing.ChildOf(span.Context()))
defer span.Finish()
}

req := tsoReqPool.Get().(*tsoRequest)
Expand Down Expand Up @@ -875,8 +875,8 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
}

func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -913,8 +913,8 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
}

func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -951,8 +951,8 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
}

func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -989,8 +989,8 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
}

func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1027,8 +1027,8 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
}

func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1102,8 +1102,8 @@ func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region {
}

func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetStore", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetStore", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1146,8 +1146,8 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
opt(options)
}

if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetAllStores", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetAllStores", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand All @@ -1173,8 +1173,8 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
}

func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateGCSafePoint", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateGCSafePoint", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1204,8 +1204,8 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
// determine the safepoint for multiple services, it does not trigger a GC
// job. Use UpdateGCSafePoint to trigger the GC job if needed.
func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateServiceGCSafePoint", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateServiceGCSafePoint", opentracing.ChildOf(span.Context()))
defer span.Finish()
}

Expand Down Expand Up @@ -1234,8 +1234,8 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
}

func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScatterRegion", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScatterRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
return c.scatterRegionsWithGroup(ctx, regionID, "")
Expand Down Expand Up @@ -1268,16 +1268,16 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g
}

func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts ...RegionsOption) (*pdpb.ScatterRegionResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScatterRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScatterRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
return c.scatterRegionsWithOptions(ctx, regionsID, opts...)
}

func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.SplitAndScatterRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.SplitAndScatterRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand All @@ -1304,8 +1304,8 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte,
}

func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetOperator", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetOperator", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand All @@ -1327,8 +1327,8 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe

// SplitRegions split regions by given split keys
func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitRegionsResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.SplitRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.SplitRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down
8 changes: 4 additions & 4 deletions client/gc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type GCClient interface {

// UpdateGCSafePointV2 update gc safe point for the given keyspace.
func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateGCSafePointV2", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateGCSafePointV2", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -63,8 +63,8 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf

// UpdateServiceSafePointV2 update service safe point for the given keyspace.
func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateServiceSafePointV2", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateServiceSafePointV2", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down
2 changes: 1 addition & 1 deletion client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func RegionByKey(key []byte) string {
// RegionsByKeyRange returns the path of PD HTTP API to scan regions with given start key, end key and limit parameters.
func RegionsByKeyRange(keyRange *KeyRange, limit int) string {
startKeyStr, endKeyStr := keyRange.EscapeAsUTF8Str()
return fmt.Sprintf("%s?start_key=%s&end_key=%s&limit=%d",
return fmt.Sprintf("%s?key=%s&end_key=%s&limit=%d",
regionsByKey, startKeyStr, endKeyStr, limit)
}

Expand Down
12 changes: 6 additions & 6 deletions client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func (c *client) keyspaceClient() keyspacepb.KeyspaceClient {

// LoadKeyspace loads and returns target keyspace's metadata.
func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.KeyspaceMeta, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("keyspaceClient.LoadKeyspace", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("keyspaceClient.LoadKeyspace", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -84,8 +84,8 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key
//
// Updated keyspace meta will be returned.
func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keyspacepb.KeyspaceState) (*keyspacepb.KeyspaceMeta, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("keyspaceClient.UpdateKeyspaceState", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("keyspaceClient.UpdateKeyspaceState", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -123,8 +123,8 @@ func (c *client) WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.Keyspac

// GetAllKeyspaces get all keyspaces metadata.
func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint32) ([]*keyspacepb.KeyspaceMeta, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("keyspaceClient.GetAllKeyspaces", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("keyspaceClient.GetAllKeyspaces", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down
8 changes: 4 additions & 4 deletions client/meta_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) (
opt(options)
}

if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.Put", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.Put", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -148,8 +148,8 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s
options.rangeEnd = getPrefix(key)
}

if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.Get", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.Get", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down
4 changes: 3 additions & 1 deletion client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,9 @@ func (c *pdServiceDiscovery) checkServiceModeChanged() error {
// If the method is not supported, we set it to pd mode.
// TODO: it's a hack way to solve the compatibility issue.
// we need to remove this after all maintained version supports the method.
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
if c.serviceModeUpdateCb != nil {
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
}
return nil
}
return err
Expand Down
35 changes: 10 additions & 25 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *tsoClient) dispatchRequest(ctx context.Context, dcLocation string, requ
return err
}

defer trace.StartRegion(request.requestCtx, "tsoReqEnqueue").End()
defer trace.StartRegion(request.requestCtx, "pdclient.tsoReqEnqueue").End()
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -104,7 +104,7 @@ func (req *tsoRequest) Wait() (physical int64, logical int64, err error) {
cmdDurationTSOAsyncWait.Observe(start.Sub(req.start).Seconds())
select {
case err = <-req.done:
defer trace.StartRegion(req.requestCtx, "tsoReqDone").End()
defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqDone").End()
err = errors.WithStack(err)
defer tsoReqPool.Put(req)
if err != nil {
Expand Down Expand Up @@ -350,7 +350,6 @@ func (c *tsoClient) handleDispatcher(
cancel context.CancelFunc
// addr -> connectionContext
connectionCtxs sync.Map
opts []opentracing.StartSpanOption
)
defer func() {
log.Info("[tso] exit tso dispatcher", zap.String("dc-location", dc))
Expand Down Expand Up @@ -499,8 +498,7 @@ tsoBatchLoop:
return
case tsDeadlineCh.(chan *deadline) <- dl:
}
opts = extractSpanReference(tbc, opts[:0])
err = c.processRequests(stream, dc, tbc, opts)
err = c.processRequests(stream, dc, tbc)
close(done)
// If error happens during tso stream handling, reset stream and run the next trial.
if err != nil {
Expand Down Expand Up @@ -758,26 +756,16 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s
return nil
}

func extractSpanReference(tbc *tsoBatchController, opts []opentracing.StartSpanOption) []opentracing.StartSpanOption {
for _, req := range tbc.getCollectedRequests() {
if span := opentracing.SpanFromContext(req.requestCtx); span != nil {
opts = append(opts, opentracing.ChildOf(span.Context()))
}
}
return opts
}

func (c *tsoClient) processRequests(
stream tsoStream, dcLocation string, tbc *tsoBatchController, opts []opentracing.StartSpanOption,
stream tsoStream, dcLocation string, tbc *tsoBatchController,
) error {
if len(opts) > 0 {
span := opentracing.StartSpan("pdclient.processRequests", opts...)
defer span.Finish()
}

requests := tbc.getCollectedRequests()
for _, req := range requests {
defer trace.StartRegion(req.requestCtx, "tsoReqSend").End()
defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqSend").End()
if span := opentracing.SpanFromContext(req.requestCtx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.processRequests", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
}
count := int64(len(requests))
reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID()
Expand Down Expand Up @@ -849,11 +837,8 @@ func (c *tsoClient) compareAndSwapTS(

func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {
for i := 0; i < len(requests); i++ {
if span := opentracing.SpanFromContext(requests[i].requestCtx); span != nil {
span.Finish()
}
requests[i].physical, requests[i].logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
defer trace.StartRegion(requests[i].requestCtx, "tsoReqDequeue").End()
defer trace.StartRegion(requests[i].requestCtx, "pdclient.tsoReqDequeue").End()
requests[i].done <- err
}
}
10 changes: 6 additions & 4 deletions pkg/storage/leveldb_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/testutil"
)

func TestLevelDBBackend(t *testing.T) {
Expand Down Expand Up @@ -86,10 +87,11 @@ func TestLevelDBBackend(t *testing.T) {
val, err = backend.Load(key)
re.NoError(err)
re.Empty(val)
time.Sleep(defaultDirtyFlushTick * 2)
val, err = backend.Load(key)
re.NoError(err)
re.Equal(value, val)
testutil.Eventually(re, func() bool {
val, err = backend.Load(key)
re.NoError(err)
return value == val
}, testutil.WithWaitFor(defaultDirtyFlushTick*5), testutil.WithTickInterval(defaultDirtyFlushTick/2))
err = backend.Remove(key)
re.NoError(err)
val, err = backend.Load(key)
Expand Down
13 changes: 9 additions & 4 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,9 +1005,12 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
saveKV, saveCache, needSync := regionGuide(region, origin)
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) && !saveKV && !saveCache {
if !saveKV && !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// We need to think of a better way to reduce this part of the cost in the future.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}
Expand Down Expand Up @@ -1035,9 +1038,11 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
regionUpdateCacheEventCounter.Inc()
}
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats)
}

// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// We need to think of a better way to reduce this part of the cost in the future.
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats)

if c.storage != nil {
// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down
Loading

0 comments on commit a750fe8

Please sign in to comment.