diff --git a/client/client.go b/client/client.go index a9f5a6fae0f..bdde1a1b675 100644 --- a/client/client.go +++ b/client/client.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/client/caller" "github.com/tikv/pd/client/clients/metastorage" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/metrics" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/tlsutil" "go.uber.org/zap" @@ -555,7 +556,7 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error { // GetAllMembers gets the members Info from PD. func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { start := time.Now() - defer func() { cmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() @@ -565,7 +566,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { return nil, errs.ErrClientGetProtoClient } resp, err := protoClient.GetMembers(ctx, req) - if err = c.respForErr(cmdFailDurationGetAllMembers, start, err, resp.GetHeader()); err != nil { + if err = c.respForErr(metrics.CmdFailedDurationGetAllMembers, start, err, resp.GetHeader()); err != nil { return nil, err } return resp.GetMembers(), nil @@ -683,7 +684,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs defer span.Finish() } start := time.Now() - defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationGetRegion.Observe(time.Since(start).Seconds()) }() var resp *pdpb.GetRegionResponse for _, url := range memberURLs { @@ -707,7 +708,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs } if resp == nil { - cmdFailDurationGetRegion.Observe(time.Since(start).Seconds()) + metrics.CmdFailedDurationGetRegion.Observe(time.Since(start).Seconds()) c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged() errorMsg := fmt.Sprintf("[pd] can't get region info from member URLs: %+v", memberURLs) return nil, errors.WithStack(errors.New(errorMsg)) @@ -722,7 +723,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio defer span.Finish() } start := time.Now() - defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationGetRegion.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() @@ -749,7 +750,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio resp, err = protoClient.GetRegion(cctx, req) } - if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil { + if err = c.respForErr(metrics.CmdFailedDurationGetRegion, start, err, resp.GetHeader()); err != nil { return nil, err } return handleRegionResponse(resp), nil @@ -762,7 +763,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR defer span.Finish() } start := time.Now() - defer func() { cmdDurationGetPrevRegion.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationGetPrevRegion.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() @@ -789,7 +790,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR resp, err = protoClient.GetPrevRegion(cctx, req) } - if err = c.respForErr(cmdFailDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil { + if err = c.respForErr(metrics.CmdFailedDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil { return nil, err } return handleRegionResponse(resp), nil @@ -802,7 +803,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt defer span.Finish() } start := time.Now() - defer func() { cmdDurationGetRegionByID.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationGetRegionByID.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() @@ -829,7 +830,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt resp, err = protoClient.GetRegionByID(cctx, req) } - if err = c.respForErr(cmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil { + if err = c.respForErr(metrics.CmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil { return nil, err } return handleRegionResponse(resp), nil @@ -842,7 +843,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, defer span.Finish() } start := time.Now() - defer func() { cmdDurationScanRegions.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationScanRegions.Observe(time.Since(start).Seconds()) }() var cancel context.CancelFunc scanCtx := ctx @@ -879,7 +880,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, resp, err = protoClient.ScanRegions(cctx, req) } - if err = c.respForErr(cmdFailedDurationScanRegions, start, err, resp.GetHeader()); err != nil { + if err = c.respForErr(metrics.CmdFailedDurationScanRegions, start, err, resp.GetHeader()); err != nil { return nil, err } @@ -893,7 +894,7 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit defer span.Finish() } start := time.Now() - defer func() { cmdDurationBatchScanRegions.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationBatchScanRegions.Observe(time.Since(start).Seconds()) }() var cancel context.CancelFunc scanCtx := ctx @@ -933,7 +934,7 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit resp, err = protoClient.BatchScanRegions(cctx, req) } - if err = c.respForErr(cmdFailedDurationBatchScanRegions, start, err, resp.GetHeader()); err != nil { + if err = c.respForErr(metrics.CmdFailedDurationBatchScanRegions, start, err, resp.GetHeader()); err != nil { return nil, err } @@ -993,7 +994,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e defer span.Finish() } start := time.Now() - defer func() { cmdDurationGetStore.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationGetStore.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() @@ -1007,7 +1008,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e } resp, err := protoClient.GetStore(ctx, req) - if err = c.respForErr(cmdFailedDurationGetStore, start, err, resp.GetHeader()); err != nil { + if err = c.respForErr(metrics.CmdFailedDurationGetStore, start, err, resp.GetHeader()); err != nil { return nil, err } return handleStoreResponse(resp) @@ -1037,7 +1038,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ( defer span.Finish() } start := time.Now() - defer func() { cmdDurationGetAllStores.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationGetAllStores.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() @@ -1051,7 +1052,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ( } resp, err := protoClient.GetAllStores(ctx, req) - if err = c.respForErr(cmdFailedDurationGetAllStores, start, err, resp.GetHeader()); err != nil { + if err = c.respForErr(metrics.CmdFailedDurationGetAllStores, start, err, resp.GetHeader()); err != nil { return nil, err } return resp.GetStores(), nil @@ -1064,7 +1065,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6 defer span.Finish() } start := time.Now() - defer func() { cmdDurationUpdateGCSafePoint.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationUpdateGCSafePoint.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() @@ -1078,7 +1079,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6 } resp, err := protoClient.UpdateGCSafePoint(ctx, req) - if err = c.respForErr(cmdFailedDurationUpdateGCSafePoint, start, err, resp.GetHeader()); err != nil { + if err = c.respForErr(metrics.CmdFailedDurationUpdateGCSafePoint, start, err, resp.GetHeader()); err != nil { return 0, err } return resp.GetNewSafePoint(), nil @@ -1095,7 +1096,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, } start := time.Now() - defer func() { cmdDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() @@ -1111,7 +1112,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, } resp, err := protoClient.UpdateServiceGCSafePoint(ctx, req) - if err = c.respForErr(cmdFailedDurationUpdateServiceGCSafePoint, start, err, resp.GetHeader()); err != nil { + if err = c.respForErr(metrics.CmdFailedDurationUpdateServiceGCSafePoint, start, err, resp.GetHeader()); err != nil { return 0, err } return resp.GetMinSafePoint(), nil @@ -1128,7 +1129,7 @@ func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error { func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, group string) error { start := time.Now() - defer func() { cmdDurationScatterRegion.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationScatterRegion.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() @@ -1167,7 +1168,7 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, defer span.Finish() } start := time.Now() - defer func() { cmdDurationSplitAndScatterRegions.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationSplitAndScatterRegions.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() options := &opt.RegionsOp{} @@ -1195,7 +1196,7 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe defer span.Finish() } start := time.Now() - defer func() { cmdDurationGetOperator.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationGetOperator.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() @@ -1217,7 +1218,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...o defer span.Finish() } start := time.Now() - defer func() { cmdDurationSplitRegions.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationSplitRegions.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() options := &opt.RegionsOp{} @@ -1246,7 +1247,7 @@ func (c *client) requestHeader() *pdpb.RequestHeader { func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint64, opts ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error) { start := time.Now() - defer func() { cmdDurationScatterRegions.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationScatterRegions.Observe(time.Since(start).Seconds()) }() options := &opt.RegionsOp{} for _, opt := range opts { opt(options) diff --git a/client/gc_client.go b/client/gc_client.go index 2b64cb91c4a..f30521905c3 100644 --- a/client/gc_client.go +++ b/client/gc_client.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/metrics" "go.uber.org/zap" ) @@ -39,7 +40,7 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf defer span.Finish() } start := time.Now() - defer func() { cmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) req := &pdpb.UpdateGCSafePointV2Request{ @@ -55,7 +56,7 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf resp, err := protoClient.UpdateGCSafePointV2(ctx, req) cancel() - if err = c.respForErr(cmdFailedDurationUpdateGCSafePointV2, start, err, resp.GetHeader()); err != nil { + if err = c.respForErr(metrics.CmdFailedDurationUpdateGCSafePointV2, start, err, resp.GetHeader()); err != nil { return 0, err } return resp.GetNewSafePoint(), nil @@ -68,7 +69,7 @@ func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32 defer span.Finish() } start := time.Now() - defer func() { cmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) req := &pdpb.UpdateServiceSafePointV2Request{ @@ -85,7 +86,7 @@ func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32 } resp, err := protoClient.UpdateServiceSafePointV2(ctx, req) cancel() - if err = c.respForErr(cmdFailedDurationUpdateServiceSafePointV2, start, err, resp.GetHeader()); err != nil { + if err = c.respForErr(metrics.CmdFailedDurationUpdateServiceSafePointV2, start, err, resp.GetHeader()); err != nil { return 0, err } return resp.GetMinSafePoint(), nil diff --git a/client/inner_client.go b/client/inner_client.go index 47acda56e42..62fcd84dd5d 100644 --- a/client/inner_client.go +++ b/client/inner_client.go @@ -10,6 +10,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/metrics" "github.com/tikv/pd/client/opt" "go.uber.org/zap" "google.golang.org/grpc" @@ -160,7 +161,7 @@ func (c *innerClient) close() { func (c *innerClient) setup() error { // Init the metrics. if c.option.InitMetrics { - initAndRegisterMetrics(c.option.MetricsLabels) + metrics.InitAndRegisterMetrics(c.option.MetricsLabels) } // Init the client base. diff --git a/client/keyspace_client.go b/client/keyspace_client.go index 3f8cea993c0..ce0cc0bc426 100644 --- a/client/keyspace_client.go +++ b/client/keyspace_client.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/metrics" ) // KeyspaceClient manages keyspace metadata. @@ -51,7 +52,7 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key defer span.Finish() } start := time.Now() - defer func() { cmdDurationLoadKeyspace.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationLoadKeyspace.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) req := &keyspacepb.LoadKeyspaceRequest{ Header: c.requestHeader(), @@ -66,13 +67,13 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key cancel() if err != nil { - cmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds()) + metrics.CmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds()) c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged() return nil, err } if resp.Header.GetError() != nil { - cmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds()) + metrics.CmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds()) return nil, errors.Errorf("Load keyspace %s failed: %s", name, resp.Header.GetError().String()) } @@ -95,7 +96,7 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp defer span.Finish() } start := time.Now() - defer func() { cmdDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) req := &keyspacepb.UpdateKeyspaceStateRequest{ Header: c.requestHeader(), @@ -111,13 +112,13 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp cancel() if err != nil { - cmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) + metrics.CmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged() return nil, err } if resp.Header.GetError() != nil { - cmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) + metrics.CmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) return nil, errors.Errorf("Update state for keyspace id %d failed: %s", id, resp.Header.GetError().String()) } @@ -139,7 +140,7 @@ func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint defer span.Finish() } start := time.Now() - defer func() { cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) req := &keyspacepb.GetAllKeyspacesRequest{ Header: c.requestHeader(), @@ -155,13 +156,13 @@ func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint cancel() if err != nil { - cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) + metrics.CmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged() return nil, err } if resp.Header.GetError() != nil { - cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) + metrics.CmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) return nil, errors.Errorf("Get all keyspaces metadata failed: %s", resp.Header.GetError().String()) } diff --git a/client/meta_storage_client.go b/client/meta_storage_client.go index 6409b6e7a46..e73982f981d 100644 --- a/client/meta_storage_client.go +++ b/client/meta_storage_client.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/kvproto/pkg/meta_storagepb" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/metrics" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/grpcutil" ) @@ -62,7 +63,7 @@ func (c *innerClient) Put(ctx context.Context, key, value []byte, opts ...opt.Me defer span.Finish() } start := time.Now() - defer func() { cmdDurationPut.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationPut.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) req := &meta_storagepb.PutRequest{ @@ -80,7 +81,7 @@ func (c *innerClient) Put(ctx context.Context, key, value []byte, opts ...opt.Me resp, err := cli.Put(ctx, req) cancel() - if err = c.respForMetaStorageErr(cmdFailedDurationPut, start, err, resp.GetHeader()); err != nil { + if err = c.respForMetaStorageErr(metrics.CmdFailedDurationPut, start, err, resp.GetHeader()); err != nil { return nil, err } return resp, nil @@ -101,7 +102,7 @@ func (c *innerClient) Get(ctx context.Context, key []byte, opts ...opt.MetaStora defer span.Finish() } start := time.Now() - defer func() { cmdDurationGet.Observe(time.Since(start).Seconds()) }() + defer func() { metrics.CmdDurationGet.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) req := &meta_storagepb.GetRequest{ @@ -119,7 +120,7 @@ func (c *innerClient) Get(ctx context.Context, key []byte, opts ...opt.MetaStora resp, err := cli.Get(ctx, req) cancel() - if err = c.respForMetaStorageErr(cmdFailedDurationGet, start, err, resp.GetHeader()); err != nil { + if err = c.respForMetaStorageErr(metrics.CmdFailedDurationGet, start, err, resp.GetHeader()); err != nil { return nil, err } return resp, nil diff --git a/client/metrics.go b/client/metrics.go deleted file mode 100644 index fdcdd88f016..00000000000 --- a/client/metrics.go +++ /dev/null @@ -1,251 +0,0 @@ -// Copyright 2016 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pd - -import ( - "sync/atomic" - - "github.com/prometheus/client_golang/prometheus" -) - -// make sure register metrics only once -var initialized int32 - -func init() { - initMetrics(prometheus.Labels{}) - initCmdDurations() -} - -func initAndRegisterMetrics(constLabels prometheus.Labels) { - if atomic.CompareAndSwapInt32(&initialized, 0, 1) { - // init metrics with constLabels - initMetrics(constLabels) - initCmdDurations() - // register metrics - registerMetrics() - } -} - -var ( - cmdDuration *prometheus.HistogramVec - cmdFailedDuration *prometheus.HistogramVec - requestDuration *prometheus.HistogramVec - tsoBestBatchSize prometheus.Histogram - tsoBatchSize prometheus.Histogram - tsoBatchSendLatency prometheus.Histogram - requestForwarded *prometheus.GaugeVec - ongoingRequestCountGauge *prometheus.GaugeVec - estimateTSOLatencyGauge *prometheus.GaugeVec -) - -func initMetrics(constLabels prometheus.Labels) { - cmdDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "pd_client", - Subsystem: "cmd", - Name: "handle_cmds_duration_seconds", - Help: "Bucketed histogram of processing time (s) of handled success cmds.", - ConstLabels: constLabels, - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), - }, []string{"type"}) - - cmdFailedDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "pd_client", - Subsystem: "cmd", - Name: "handle_failed_cmds_duration_seconds", - Help: "Bucketed histogram of processing time (s) of failed handled cmds.", - ConstLabels: constLabels, - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), - }, []string{"type"}) - - requestDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "pd_client", - Subsystem: "request", - Name: "handle_requests_duration_seconds", - Help: "Bucketed histogram of processing time (s) of handled requests.", - ConstLabels: constLabels, - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), - }, []string{"type"}) - - tsoBestBatchSize = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: "pd_client", - Subsystem: "request", - Name: "handle_tso_best_batch_size", - Help: "Bucketed histogram of the best batch size of handled requests.", - ConstLabels: constLabels, - Buckets: prometheus.ExponentialBuckets(1, 2, 13), - }) - - tsoBatchSize = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: "pd_client", - Subsystem: "request", - Name: "handle_tso_batch_size", - Help: "Bucketed histogram of the batch size of handled requests.", - ConstLabels: constLabels, - Buckets: []float64{1, 2, 4, 8, 10, 14, 18, 22, 26, 30, 35, 40, 45, 50, 60, 70, 80, 90, 100, 110, 120, 140, 160, 180, 200, 500, 1000}, - }) - - tsoBatchSendLatency = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: "pd_client", - Subsystem: "request", - Name: "tso_batch_send_latency", - ConstLabels: constLabels, - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), - Help: "tso batch send latency", - }) - - requestForwarded = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "pd_client", - Subsystem: "request", - Name: "forwarded_status", - Help: "The status to indicate if the request is forwarded", - ConstLabels: constLabels, - }, []string{"host", "delegate"}) - - ongoingRequestCountGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "pd_client", - Subsystem: "request", - Name: "ongoing_requests_count", - Help: "Current count of ongoing batch tso requests", - ConstLabels: constLabels, - }, []string{"stream"}) - estimateTSOLatencyGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "pd_client", - Subsystem: "request", - Name: "estimate_tso_latency", - Help: "Estimated latency of an RTT of getting TSO", - ConstLabels: constLabels, - }, []string{"stream"}) -} - -var ( - cmdDurationTSOWait prometheus.Observer - cmdDurationTSO prometheus.Observer - cmdDurationTSOAsyncWait prometheus.Observer - cmdDurationGetRegion prometheus.Observer - cmdDurationGetAllMembers prometheus.Observer - cmdDurationGetPrevRegion prometheus.Observer - cmdDurationGetRegionByID prometheus.Observer - cmdDurationScanRegions prometheus.Observer - cmdDurationBatchScanRegions prometheus.Observer - cmdDurationGetStore prometheus.Observer - cmdDurationGetAllStores prometheus.Observer - cmdDurationUpdateGCSafePoint prometheus.Observer - cmdDurationUpdateServiceGCSafePoint prometheus.Observer - cmdDurationScatterRegion prometheus.Observer - cmdDurationScatterRegions prometheus.Observer - cmdDurationGetOperator prometheus.Observer - cmdDurationSplitRegions prometheus.Observer - cmdDurationSplitAndScatterRegions prometheus.Observer - cmdDurationLoadKeyspace prometheus.Observer - cmdDurationUpdateKeyspaceState prometheus.Observer - cmdDurationGetAllKeyspaces prometheus.Observer - cmdDurationGet prometheus.Observer - cmdDurationPut prometheus.Observer - cmdDurationUpdateGCSafePointV2 prometheus.Observer - cmdDurationUpdateServiceSafePointV2 prometheus.Observer - - cmdFailDurationGetRegion prometheus.Observer - cmdFailDurationTSOWait prometheus.Observer - cmdFailDurationTSO prometheus.Observer - cmdFailDurationGetAllMembers prometheus.Observer - cmdFailDurationGetPrevRegion prometheus.Observer - cmdFailedDurationGetRegionByID prometheus.Observer - cmdFailedDurationScanRegions prometheus.Observer - cmdFailedDurationBatchScanRegions prometheus.Observer - cmdFailedDurationGetStore prometheus.Observer - cmdFailedDurationGetAllStores prometheus.Observer - cmdFailedDurationUpdateGCSafePoint prometheus.Observer - cmdFailedDurationUpdateServiceGCSafePoint prometheus.Observer - cmdFailedDurationLoadKeyspace prometheus.Observer - cmdFailedDurationUpdateKeyspaceState prometheus.Observer - cmdFailedDurationGet prometheus.Observer - cmdFailedDurationPut prometheus.Observer - cmdFailedDurationUpdateGCSafePointV2 prometheus.Observer - cmdFailedDurationUpdateServiceSafePointV2 prometheus.Observer - - requestDurationTSO prometheus.Observer - requestFailedDurationTSO prometheus.Observer -) - -func initCmdDurations() { - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - cmdDurationTSOWait = cmdDuration.WithLabelValues("wait") - cmdDurationTSO = cmdDuration.WithLabelValues("tso") - cmdDurationTSOAsyncWait = cmdDuration.WithLabelValues("tso_async_wait") - cmdDurationGetRegion = cmdDuration.WithLabelValues("get_region") - cmdDurationGetAllMembers = cmdDuration.WithLabelValues("get_member_info") - cmdDurationGetPrevRegion = cmdDuration.WithLabelValues("get_prev_region") - cmdDurationGetRegionByID = cmdDuration.WithLabelValues("get_region_byid") - cmdDurationScanRegions = cmdDuration.WithLabelValues("scan_regions") - cmdDurationBatchScanRegions = cmdDuration.WithLabelValues("batch_scan_regions") - cmdDurationGetStore = cmdDuration.WithLabelValues("get_store") - cmdDurationGetAllStores = cmdDuration.WithLabelValues("get_all_stores") - cmdDurationUpdateGCSafePoint = cmdDuration.WithLabelValues("update_gc_safe_point") - cmdDurationUpdateServiceGCSafePoint = cmdDuration.WithLabelValues("update_service_gc_safe_point") - cmdDurationScatterRegion = cmdDuration.WithLabelValues("scatter_region") - cmdDurationScatterRegions = cmdDuration.WithLabelValues("scatter_regions") - cmdDurationGetOperator = cmdDuration.WithLabelValues("get_operator") - cmdDurationSplitRegions = cmdDuration.WithLabelValues("split_regions") - cmdDurationSplitAndScatterRegions = cmdDuration.WithLabelValues("split_and_scatter_regions") - cmdDurationLoadKeyspace = cmdDuration.WithLabelValues("load_keyspace") - cmdDurationUpdateKeyspaceState = cmdDuration.WithLabelValues("update_keyspace_state") - cmdDurationGetAllKeyspaces = cmdDuration.WithLabelValues("get_all_keyspaces") - cmdDurationGet = cmdDuration.WithLabelValues("get") - cmdDurationPut = cmdDuration.WithLabelValues("put") - cmdDurationUpdateGCSafePointV2 = cmdDuration.WithLabelValues("update_gc_safe_point_v2") - cmdDurationUpdateServiceSafePointV2 = cmdDuration.WithLabelValues("update_service_safe_point_v2") - - cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region") - cmdFailDurationTSOWait = cmdFailedDuration.WithLabelValues("wait") - cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso") - cmdFailDurationGetAllMembers = cmdFailedDuration.WithLabelValues("get_member_info") - cmdFailDurationGetPrevRegion = cmdFailedDuration.WithLabelValues("get_prev_region") - cmdFailedDurationGetRegionByID = cmdFailedDuration.WithLabelValues("get_region_byid") - cmdFailedDurationScanRegions = cmdFailedDuration.WithLabelValues("scan_regions") - cmdFailedDurationBatchScanRegions = cmdFailedDuration.WithLabelValues("batch_scan_regions") - cmdFailedDurationGetStore = cmdFailedDuration.WithLabelValues("get_store") - cmdFailedDurationGetAllStores = cmdFailedDuration.WithLabelValues("get_all_stores") - cmdFailedDurationUpdateGCSafePoint = cmdFailedDuration.WithLabelValues("update_gc_safe_point") - cmdFailedDurationUpdateServiceGCSafePoint = cmdFailedDuration.WithLabelValues("update_service_gc_safe_point") - cmdFailedDurationLoadKeyspace = cmdFailedDuration.WithLabelValues("load_keyspace") - cmdFailedDurationUpdateKeyspaceState = cmdFailedDuration.WithLabelValues("update_keyspace_state") - cmdFailedDurationGet = cmdFailedDuration.WithLabelValues("get") - cmdFailedDurationPut = cmdFailedDuration.WithLabelValues("put") - cmdFailedDurationUpdateGCSafePointV2 = cmdFailedDuration.WithLabelValues("update_gc_safe_point_v2") - cmdFailedDurationUpdateServiceSafePointV2 = cmdFailedDuration.WithLabelValues("update_service_safe_point_v2") - - requestDurationTSO = requestDuration.WithLabelValues("tso") - requestFailedDurationTSO = requestDuration.WithLabelValues("tso-failed") -} - -func registerMetrics() { - prometheus.MustRegister(cmdDuration) - prometheus.MustRegister(cmdFailedDuration) - prometheus.MustRegister(requestDuration) - prometheus.MustRegister(tsoBestBatchSize) - prometheus.MustRegister(tsoBatchSize) - prometheus.MustRegister(tsoBatchSendLatency) - prometheus.MustRegister(requestForwarded) - prometheus.MustRegister(estimateTSOLatencyGauge) -} diff --git a/client/metrics/metrics.go b/client/metrics/metrics.go new file mode 100644 index 00000000000..da36217eb34 --- /dev/null +++ b/client/metrics/metrics.go @@ -0,0 +1,262 @@ +// Copyright 2016 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" +) + +// make sure register metrics only once +var initialized int32 + +func init() { + initMetrics(prometheus.Labels{}) + initCmdDurations() +} + +// InitAndRegisterMetrics initializes and registers the metrics manually. +func InitAndRegisterMetrics(constLabels prometheus.Labels) { + if atomic.CompareAndSwapInt32(&initialized, 0, 1) { + // init metrics with constLabels + initMetrics(constLabels) + initCmdDurations() + // register metrics + registerMetrics() + } +} + +var ( + cmdDuration *prometheus.HistogramVec + cmdFailedDuration *prometheus.HistogramVec + requestDuration *prometheus.HistogramVec + + // TSOBestBatchSize is the histogram of the best batch size of TSO requests. + TSOBestBatchSize prometheus.Histogram + // TSOBatchSize is the histogram of the batch size of TSO requests. + TSOBatchSize prometheus.Histogram + // TSOBatchSendLatency is the histogram of the latency of sending TSO requests. + TSOBatchSendLatency prometheus.Histogram + // RequestForwarded is the gauge to indicate if the request is forwarded. + RequestForwarded *prometheus.GaugeVec + // OngoingRequestCountGauge is the gauge to indicate the count of ongoing TSO requests. + OngoingRequestCountGauge *prometheus.GaugeVec + // EstimateTSOLatencyGauge is the gauge to indicate the estimated latency of TSO requests. + EstimateTSOLatencyGauge *prometheus.GaugeVec +) + +func initMetrics(constLabels prometheus.Labels) { + cmdDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd_client", + Subsystem: "cmd", + Name: "handle_cmds_duration_seconds", + Help: "Bucketed histogram of processing time (s) of handled success cmds.", + ConstLabels: constLabels, + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), + }, []string{"type"}) + + cmdFailedDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd_client", + Subsystem: "cmd", + Name: "handle_failed_cmds_duration_seconds", + Help: "Bucketed histogram of processing time (s) of failed handled cmds.", + ConstLabels: constLabels, + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), + }, []string{"type"}) + + requestDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd_client", + Subsystem: "request", + Name: "handle_requests_duration_seconds", + Help: "Bucketed histogram of processing time (s) of handled requests.", + ConstLabels: constLabels, + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), + }, []string{"type"}) + + TSOBestBatchSize = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "pd_client", + Subsystem: "request", + Name: "handle_tso_best_batch_size", + Help: "Bucketed histogram of the best batch size of handled requests.", + ConstLabels: constLabels, + Buckets: prometheus.ExponentialBuckets(1, 2, 13), + }) + + TSOBatchSize = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "pd_client", + Subsystem: "request", + Name: "handle_tso_batch_size", + Help: "Bucketed histogram of the batch size of handled requests.", + ConstLabels: constLabels, + Buckets: []float64{1, 2, 4, 8, 10, 14, 18, 22, 26, 30, 35, 40, 45, 50, 60, 70, 80, 90, 100, 110, 120, 140, 160, 180, 200, 500, 1000}, + }) + + TSOBatchSendLatency = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "pd_client", + Subsystem: "request", + Name: "tso_batch_send_latency", + ConstLabels: constLabels, + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), + Help: "tso batch send latency", + }) + + RequestForwarded = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd_client", + Subsystem: "request", + Name: "forwarded_status", + Help: "The status to indicate if the request is forwarded", + ConstLabels: constLabels, + }, []string{"host", "delegate"}) + + OngoingRequestCountGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd_client", + Subsystem: "request", + Name: "ongoing_requests_count", + Help: "Current count of ongoing batch tso requests", + ConstLabels: constLabels, + }, []string{"stream"}) + EstimateTSOLatencyGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd_client", + Subsystem: "request", + Name: "estimate_tso_latency", + Help: "Estimated latency of an RTT of getting TSO", + ConstLabels: constLabels, + }, []string{"stream"}) +} + +// CmdDurationXXX and CmdFailedDurationXXX are the durations of the client commands. +var ( + CmdDurationTSOWait prometheus.Observer + CmdDurationTSO prometheus.Observer + CmdDurationTSOAsyncWait prometheus.Observer + CmdDurationGetRegion prometheus.Observer + CmdDurationGetAllMembers prometheus.Observer + CmdDurationGetPrevRegion prometheus.Observer + CmdDurationGetRegionByID prometheus.Observer + CmdDurationScanRegions prometheus.Observer + CmdDurationBatchScanRegions prometheus.Observer + CmdDurationGetStore prometheus.Observer + CmdDurationGetAllStores prometheus.Observer + CmdDurationUpdateGCSafePoint prometheus.Observer + CmdDurationUpdateServiceGCSafePoint prometheus.Observer + CmdDurationScatterRegion prometheus.Observer + CmdDurationScatterRegions prometheus.Observer + CmdDurationGetOperator prometheus.Observer + CmdDurationSplitRegions prometheus.Observer + CmdDurationSplitAndScatterRegions prometheus.Observer + CmdDurationLoadKeyspace prometheus.Observer + CmdDurationUpdateKeyspaceState prometheus.Observer + CmdDurationGetAllKeyspaces prometheus.Observer + CmdDurationGet prometheus.Observer + CmdDurationPut prometheus.Observer + CmdDurationUpdateGCSafePointV2 prometheus.Observer + CmdDurationUpdateServiceSafePointV2 prometheus.Observer + + CmdFailedDurationGetRegion prometheus.Observer + CmdFailedDurationTSOWait prometheus.Observer + CmdFailedDurationTSO prometheus.Observer + CmdFailedDurationGetAllMembers prometheus.Observer + CmdFailedDurationGetPrevRegion prometheus.Observer + CmdFailedDurationGetRegionByID prometheus.Observer + CmdFailedDurationScanRegions prometheus.Observer + CmdFailedDurationBatchScanRegions prometheus.Observer + CmdFailedDurationGetStore prometheus.Observer + CmdFailedDurationGetAllStores prometheus.Observer + CmdFailedDurationUpdateGCSafePoint prometheus.Observer + CmdFailedDurationUpdateServiceGCSafePoint prometheus.Observer + CmdFailedDurationLoadKeyspace prometheus.Observer + CmdFailedDurationUpdateKeyspaceState prometheus.Observer + CmdFailedDurationGet prometheus.Observer + CmdFailedDurationPut prometheus.Observer + CmdFailedDurationUpdateGCSafePointV2 prometheus.Observer + CmdFailedDurationUpdateServiceSafePointV2 prometheus.Observer + + // RequestDurationTSO records the durations of the successful TSO requests. + RequestDurationTSO prometheus.Observer + // RequestFailedDurationTSO records the durations of the failed TSO requests. + RequestFailedDurationTSO prometheus.Observer +) + +func initCmdDurations() { + // WithLabelValues is a heavy operation, define variable to avoid call it every time. + CmdDurationTSOWait = cmdDuration.WithLabelValues("wait") + CmdDurationTSO = cmdDuration.WithLabelValues("tso") + CmdDurationTSOAsyncWait = cmdDuration.WithLabelValues("tso_async_wait") + CmdDurationGetRegion = cmdDuration.WithLabelValues("get_region") + CmdDurationGetAllMembers = cmdDuration.WithLabelValues("get_member_info") + CmdDurationGetPrevRegion = cmdDuration.WithLabelValues("get_prev_region") + CmdDurationGetRegionByID = cmdDuration.WithLabelValues("get_region_byid") + CmdDurationScanRegions = cmdDuration.WithLabelValues("scan_regions") + CmdDurationBatchScanRegions = cmdDuration.WithLabelValues("batch_scan_regions") + CmdDurationGetStore = cmdDuration.WithLabelValues("get_store") + CmdDurationGetAllStores = cmdDuration.WithLabelValues("get_all_stores") + CmdDurationUpdateGCSafePoint = cmdDuration.WithLabelValues("update_gc_safe_point") + CmdDurationUpdateServiceGCSafePoint = cmdDuration.WithLabelValues("update_service_gc_safe_point") + CmdDurationScatterRegion = cmdDuration.WithLabelValues("scatter_region") + CmdDurationScatterRegions = cmdDuration.WithLabelValues("scatter_regions") + CmdDurationGetOperator = cmdDuration.WithLabelValues("get_operator") + CmdDurationSplitRegions = cmdDuration.WithLabelValues("split_regions") + CmdDurationSplitAndScatterRegions = cmdDuration.WithLabelValues("split_and_scatter_regions") + CmdDurationLoadKeyspace = cmdDuration.WithLabelValues("load_keyspace") + CmdDurationUpdateKeyspaceState = cmdDuration.WithLabelValues("update_keyspace_state") + CmdDurationGetAllKeyspaces = cmdDuration.WithLabelValues("get_all_keyspaces") + CmdDurationGet = cmdDuration.WithLabelValues("get") + CmdDurationPut = cmdDuration.WithLabelValues("put") + CmdDurationUpdateGCSafePointV2 = cmdDuration.WithLabelValues("update_gc_safe_point_v2") + CmdDurationUpdateServiceSafePointV2 = cmdDuration.WithLabelValues("update_service_safe_point_v2") + + CmdFailedDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region") + CmdFailedDurationTSOWait = cmdFailedDuration.WithLabelValues("wait") + CmdFailedDurationTSO = cmdFailedDuration.WithLabelValues("tso") + CmdFailedDurationGetAllMembers = cmdFailedDuration.WithLabelValues("get_member_info") + CmdFailedDurationGetPrevRegion = cmdFailedDuration.WithLabelValues("get_prev_region") + CmdFailedDurationGetRegionByID = cmdFailedDuration.WithLabelValues("get_region_byid") + CmdFailedDurationScanRegions = cmdFailedDuration.WithLabelValues("scan_regions") + CmdFailedDurationBatchScanRegions = cmdFailedDuration.WithLabelValues("batch_scan_regions") + CmdFailedDurationGetStore = cmdFailedDuration.WithLabelValues("get_store") + CmdFailedDurationGetAllStores = cmdFailedDuration.WithLabelValues("get_all_stores") + CmdFailedDurationUpdateGCSafePoint = cmdFailedDuration.WithLabelValues("update_gc_safe_point") + CmdFailedDurationUpdateServiceGCSafePoint = cmdFailedDuration.WithLabelValues("update_service_gc_safe_point") + CmdFailedDurationLoadKeyspace = cmdFailedDuration.WithLabelValues("load_keyspace") + CmdFailedDurationUpdateKeyspaceState = cmdFailedDuration.WithLabelValues("update_keyspace_state") + CmdFailedDurationGet = cmdFailedDuration.WithLabelValues("get") + CmdFailedDurationPut = cmdFailedDuration.WithLabelValues("put") + CmdFailedDurationUpdateGCSafePointV2 = cmdFailedDuration.WithLabelValues("update_gc_safe_point_v2") + CmdFailedDurationUpdateServiceSafePointV2 = cmdFailedDuration.WithLabelValues("update_service_safe_point_v2") + + RequestDurationTSO = requestDuration.WithLabelValues("tso") + RequestFailedDurationTSO = requestDuration.WithLabelValues("tso-failed") +} + +func registerMetrics() { + prometheus.MustRegister(cmdDuration) + prometheus.MustRegister(cmdFailedDuration) + prometheus.MustRegister(requestDuration) + prometheus.MustRegister(TSOBestBatchSize) + prometheus.MustRegister(TSOBatchSize) + prometheus.MustRegister(TSOBatchSendLatency) + prometheus.MustRegister(RequestForwarded) + prometheus.MustRegister(EstimateTSOLatencyGauge) +} diff --git a/client/tso_client.go b/client/tso_client.go index 18e39dffd14..cdd85dd2479 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/metrics" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/grpcutil" "go.uber.org/zap" @@ -336,7 +337,7 @@ func (c *tsoClient) tryConnectToTSO( addr := trimHTTPPrefix(backupURL) // the goroutine is used to check the network and change back to the original stream go c.checkLeader(ctx, cancel, forwardedHostTrim, addr, url, updateAndClear) - requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1) + metrics.RequestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1) updateAndClear(backupURL, &tsoConnectionContext{cctx, cancel, backupURL, stream}) return nil } @@ -355,7 +356,7 @@ func (c *tsoClient) checkLeader( defer func() { // cancel the forward stream forwardCancel() - requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(0) + metrics.RequestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(0) }() cc, u := c.getTSOLeaderClientConn() var healthCli healthpb.HealthClient @@ -441,7 +442,7 @@ func (c *tsoClient) tryConnectToTSOWithProxy( if addr != leaderAddr { forwardedHostTrim := trimHTTPPrefix(forwardedHost) addrTrim := trimHTTPPrefix(addr) - requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1) + metrics.RequestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1) } connectionCtxs.Store(addr, &tsoConnectionContext{cctx, cancel, addr, stream}) continue diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 7d19a11c2d0..c696dc26b36 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/metrics" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/client/utils/timerutil" @@ -127,7 +128,7 @@ func newTSODispatcher( return newBatchController[*tsoRequest]( maxBatchSize*2, tsoRequestFinisher(0, 0, invalidStreamID), - tsoBestBatchSize, + metrics.TSOBestBatchSize, ) }, }, diff --git a/client/tso_request.go b/client/tso_request.go index 29654752cd0..d2048e4b3b1 100644 --- a/client/tso_request.go +++ b/client/tso_request.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/tikv/pd/client/metrics" ) // TSFuture is a future which promises to return a TSO. @@ -67,7 +68,7 @@ func (req *tsoRequest) waitCtx(ctx context.Context) (physical int64, logical int // If tso command duration is observed very high, the reason could be it // takes too long for Wait() be called. start := time.Now() - cmdDurationTSOAsyncWait.Observe(start.Sub(req.start).Seconds()) + metrics.CmdDurationTSOAsyncWait.Observe(start.Sub(req.start).Seconds()) select { case err = <-req.done: defer req.pool.Put(req) @@ -75,13 +76,13 @@ func (req *tsoRequest) waitCtx(ctx context.Context) (physical int64, logical int err = errors.WithStack(err) now := time.Now() if err != nil { - cmdFailDurationTSOWait.Observe(now.Sub(start).Seconds()) - cmdFailDurationTSO.Observe(now.Sub(req.start).Seconds()) + metrics.CmdFailedDurationTSOWait.Observe(now.Sub(start).Seconds()) + metrics.CmdFailedDurationTSO.Observe(now.Sub(req.start).Seconds()) return 0, 0, err } physical, logical = req.physical, req.logical - cmdDurationTSOWait.Observe(now.Sub(start).Seconds()) - cmdDurationTSO.Observe(now.Sub(req.start).Seconds()) + metrics.CmdDurationTSOWait.Observe(now.Sub(start).Seconds()) + metrics.CmdDurationTSO.Observe(now.Sub(req.start).Seconds()) return case <-ctx.Done(): return 0, 0, errors.WithStack(ctx.Err()) diff --git a/client/tso_stream.go b/client/tso_stream.go index 55bfd0b72b0..51ae5696dc4 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/metrics" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -243,7 +244,7 @@ func newTSOStream(ctx context.Context, serverURL string, stream grpcTSOStreamAda cancel: cancel, - ongoingRequestCountGauge: ongoingRequestCountGauge.WithLabelValues(streamID), + ongoingRequestCountGauge: metrics.OngoingRequestCountGauge.WithLabelValues(streamID), } s.wg.Add(1) go s.recvLoop(ctx) @@ -309,7 +310,7 @@ func (s *tsoStream) processRequests( log.Warn("failed to send RPC request through tsoStream", zap.String("stream", s.streamID), zap.Error(err)) return nil } - tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds()) + metrics.TSOBatchSendLatency.Observe(time.Since(batchStartTime).Seconds()) s.ongoingRequestCountGauge.Set(float64(s.ongoingRequests.Add(1))) return nil } @@ -382,7 +383,7 @@ func (s *tsoStream) recvLoop(ctx context.Context) { micros := math.Exp(filteredValue) s.estimatedLatencyMicros.Store(uint64(micros)) // Update the metrics in seconds. - estimateTSOLatencyGauge.WithLabelValues(s.streamID).Set(micros * 1e-6) + metrics.EstimateTSOLatencyGauge.WithLabelValues(s.streamID).Set(micros * 1e-6) } recvLoop: @@ -413,7 +414,7 @@ recvLoop: // Note that it's also possible that the stream is broken due to network without being requested. In this // case, `Recv` may return an error while no request is pending. if hasReq { - requestFailedDurationTSO.Observe(latencySeconds) + metrics.RequestFailedDurationTSO.Observe(latencySeconds) } if err == io.EOF { finishWithErr = errors.WithStack(errs.ErrClientTSOStreamClosed) @@ -426,8 +427,8 @@ recvLoop: break recvLoop } - requestDurationTSO.Observe(latencySeconds) - tsoBatchSize.Observe(float64(res.count)) + metrics.RequestDurationTSO.Observe(latencySeconds) + metrics.TSOBatchSize.Observe(float64(res.count)) updateEstimatedLatency(currentReq.startTime, latency) if res.count != uint32(currentReq.count) {