diff --git a/metrics/grafana/client_java_summary.json b/metrics/grafana/client_java_summary.json index deaa2c9b34b..46895dd20aa 100644 --- a/metrics/grafana/client_java_summary.json +++ b/metrics/grafana/client_java_summary.json @@ -113,7 +113,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_raw_requests_latency_count{instance=~\"$instance\"}[$__rate_interval])) by (type)", + "expr": "sum(rate(client_java_raw_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)", "format": "time_series", "interval": "", "intervalFactor": 1, @@ -214,7 +214,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_raw_requests_failure_total{instance=~\"$instance\"}[$__rate_interval])) by (type)", + "expr": "sum(rate(client_java_raw_requests_failure_total{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)", "format": "time_series", "interval": "", "intervalFactor": 1, @@ -315,7 +315,7 @@ "targets": [ { "exemplar": true, - "expr": "histogram_quantile(0.99, sum(rate(client_java_raw_requests_latency_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, type))", + "expr": "histogram_quantile(0.99, sum(rate(client_java_raw_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type))", "format": "time_series", "hide": false, "interval": "", @@ -325,7 +325,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(1, sum(rate(client_java_raw_requests_latency_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, type))", + "expr": "histogram_quantile(1, sum(rate(client_java_raw_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type))", "hide": false, "interval": "", "legendFormat": "{{type}} - max", @@ -435,7 +435,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_raw_requests_latency_sum{instance=~\"$instance\"}[$__rate_interval])) by (type) / sum(rate(client_java_raw_requests_latency_count{instance=~\"$instance\"}[$__rate_interval])) by (type)", + "expr": "sum(rate(client_java_raw_requests_latency_sum{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type) / sum(rate(client_java_raw_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)", "format": "time_series", "hide": false, "interval": "", @@ -561,7 +561,7 @@ "targets": [ { "exemplar": true, - "expr": "histogram_quantile(1, sum(rate(client_java_grpc_raw_requests_latency_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, type))", + "expr": "histogram_quantile(1, sum(rate(client_java_grpc_raw_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type))", "format": "time_series", "instant": false, "interval": "", @@ -572,7 +572,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(0.99, sum(rate(client_java_grpc_raw_requests_latency_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, type))", + "expr": "histogram_quantile(0.99, sum(rate(client_java_grpc_raw_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type))", "format": "time_series", "hide": false, "instant": false, @@ -686,7 +686,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_grpc_raw_requests_latency_sum{instance=~\"$instance\"}[$__rate_interval])) by (type) / sum(rate(client_java_grpc_raw_requests_latency_count{instance=~\"$instance\"}[$__rate_interval])) by (type)", + "expr": "sum(rate(client_java_grpc_raw_requests_latency_sum{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type) / sum(rate(client_java_grpc_raw_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)", "format": "time_series", "interval": "", "intervalFactor": 1, @@ -796,7 +796,7 @@ "targets": [ { "exemplar": true, - "expr": "histogram_quantile(1,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (le, type))", + "expr": "histogram_quantile(1,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (le, type))", "interval": "", "legendFormat": "{{ type }} -- max", "queryType": "randomWalk", @@ -804,7 +804,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (le, type))", + "expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (le, type))", "hide": false, "interval": "", "legendFormat": "{{ type }} -- 99", @@ -914,7 +914,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_grpc_single_requests_latency_sum{instance=~\"$instance\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (type) / sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (type)", + "expr": "sum(rate(client_java_grpc_single_requests_latency_sum{instance=~\"$instance\", cluster=~\"$cluster\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (type) / sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (type)", "interval": "", "legendFormat": "{{ type }}", "queryType": "randomWalk", @@ -1023,7 +1023,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_seek_leader_store_duration_sum{instance=~\"$instance\"}[$__rate_interval])) by (le) / sum(rate(client_java_seek_leader_store_duration_count{instance=~\"$instance\"}[$__rate_interval])) by (le)", + "expr": "sum(rate(client_java_seek_leader_store_duration_sum{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le) / sum(rate(client_java_seek_leader_store_duration_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le)", "interval": "", "legendFormat": "seek-leader-store-avg", "queryType": "randomWalk", @@ -1031,7 +1031,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(0.99,sum(rate(client_java_seek_leader_store_duration_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, type))", + "expr": "histogram_quantile(0.99,sum(rate(client_java_seek_leader_store_duration_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type))", "hide": false, "interval": "", "legendFormat": "seek-leader-store-99", @@ -1140,7 +1140,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_seek_proxy_store_duration_sum{instance=~\"$instance\"}[$__rate_interval])) by (le) / sum(rate(client_java_seek_proxy_store_duration_count{instance=~\"$instance\"}[$__rate_interval])) by (le)", + "expr": "sum(rate(client_java_seek_proxy_store_duration_sum{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le) / sum(rate(client_java_seek_proxy_store_duration_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le)", "interval": "", "legendFormat": "seek-proxy-store-avg", "queryType": "randomWalk", @@ -1148,7 +1148,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(0.99,sum(rate(client_java_seek_proxy_store_duration_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, type))", + "expr": "histogram_quantile(0.99,sum(rate(client_java_seek_proxy_store_duration_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type))", "hide": false, "interval": "", "legendFormat": "seek-proxy-store-99", @@ -1259,7 +1259,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", type!=\"BoPDRPC\"}[$__rate_interval])) by (le, type)", + "expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", cluster=~\"$cluster\", type!=\"BoPDRPC\"}[$__rate_interval])) by (le, type)", "hide": false, "interval": "", "legendFormat": "{{type}}-total", @@ -1358,7 +1358,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_backoff_duration_count{instance=~\"$instance\"}[$__rate_interval])) by (le, type)", + "expr": "sum(rate(client_java_backoff_duration_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type)", "interval": "", "legendFormat": "{{type}}-count", "queryType": "randomWalk", @@ -1459,7 +1459,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", type!=\"BoPDRPC\"}[$__rate_interval])) by (le, type) / sum(rate(client_java_backoff_duration_count{instance=~\"$instance\", type!=\"BoPDRPC\"}[$__rate_interval])) by (le, type)", + "expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", cluster=~\"$cluster\", type!=\"BoPDRPC\"}[$__rate_interval])) by (le, type) / sum(rate(client_java_backoff_duration_count{instance=~\"$instance\", cluster=~\"$cluster\", type!=\"BoPDRPC\"}[$__rate_interval])) by (le, type)", "interval": "", "legendFormat": "{{type}}-avg", "queryType": "randomWalk", @@ -1573,7 +1573,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_get_region_by_requests_latency_sum{instance=~\"$instance\"}[$__rate_interval])) / sum(rate(client_java_get_region_by_requests_latency_count{instance=~\"$instance\"}[$__rate_interval]))", + "expr": "sum(rate(client_java_get_region_by_requests_latency_sum{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) / sum(rate(client_java_get_region_by_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval]))", "interval": "", "legendFormat": "avg", "queryType": "randomWalk", @@ -1581,7 +1581,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(0.99, sum(rate(client_java_get_region_by_requests_latency_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le))", + "expr": "histogram_quantile(0.99, sum(rate(client_java_get_region_by_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le))", "hide": false, "interval": "", "legendFormat": "99th", @@ -1680,7 +1680,7 @@ "targets": [ { "exemplar": true, - "expr": "1 - sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", type=\"pdpb.PD/GetRegion\"}[$__rate_interval])) / sum(rate(client_java_get_region_by_requests_latency_count{instance=~\"$instance\"}[$__rate_interval]))", + "expr": "1 - sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\", type=\"pdpb.PD/GetRegion\"}[$__rate_interval])) / sum(rate(client_java_get_region_by_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval]))", "interval": "", "legendFormat": "hit ratio", "queryType": "randomWalk", @@ -1788,7 +1788,7 @@ "targets": [ { "exemplar": true, - "expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", type=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (le, type))", + "expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\", type=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (le, type))", "interval": "", "legendFormat": "{{ type }}-99th", "queryType": "randomWalk", @@ -1796,7 +1796,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", type=\"pdpb.PD/GetRegion\"}[$__rate_interval])) by (le, type))", + "expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\", type=\"pdpb.PD/GetRegion\"}[$__rate_interval])) by (le, type))", "hide": false, "interval": "", "legendFormat": "{{ type }}-99th", @@ -1804,7 +1804,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", type=\"pdpb.PD/GetMembers\"}[$__rate_interval])) by (le, type))", + "expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\", type=\"pdpb.PD/GetMembers\"}[$__rate_interval])) by (le, type))", "hide": false, "interval": "", "legendFormat": "{{ type }}-99th", @@ -1904,7 +1904,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", type=\"pdpb.PD/GetRegion\"}[$__rate_interval])) by (type)", + "expr": "sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\", type=\"pdpb.PD/GetRegion\"}[$__rate_interval])) by (type)", "hide": false, "interval": "", "legendFormat": "{{type}}", @@ -1912,7 +1912,7 @@ }, { "exemplar": true, - "expr": "sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", type=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (type)", + "expr": "sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\", type=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (type)", "hide": false, "interval": "", "legendFormat": "{{type}}", @@ -1920,7 +1920,7 @@ }, { "exemplar": true, - "expr": "sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", type=\"pdpb.PD/GetMembers\"}[$__rate_interval])) by (type)", + "expr": "sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\", type=\"pdpb.PD/GetMembers\"}[$__rate_interval])) by (type)", "hide": false, "interval": "", "legendFormat": "{{type}}", @@ -2021,7 +2021,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type) / sum(rate(client_java_backoff_duration_count{instance=~\"$instance\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type)", + "expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", cluster=~\"$cluster\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type) / sum(rate(client_java_backoff_duration_count{instance=~\"$instance\", cluster=~\"$cluster\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type)", "interval": "", "legendFormat": "{{type}}-avg", "queryType": "randomWalk", @@ -2029,7 +2029,7 @@ }, { "exemplar": true, - "expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type)", + "expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", cluster=~\"$cluster\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type)", "hide": false, "interval": "", "legendFormat": "{{type}}-sum", @@ -2128,7 +2128,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_backoff_duration_count{instance=~\"$instance\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type)", + "expr": "sum(rate(client_java_backoff_duration_count{instance=~\"$instance\", cluster=~\"$cluster\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type)", "interval": "", "legendFormat": "{{type}}", "queryType": "randomWalk", @@ -2350,7 +2350,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_smart_raw_requests_latency_count{instance=~\"$instance\"}[$__rate_interval])) by (type)", + "expr": "sum(rate(client_java_smart_raw_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)", "format": "time_series", "interval": "", "intervalFactor": 1, @@ -2451,7 +2451,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_smart_raw_requests_failure_total{instance=~\"$instance\"}[$__rate_interval])) by (type)", + "expr": "sum(rate(client_java_smart_raw_requests_failure_total{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)", "format": "time_series", "interval": "", "intervalFactor": 1, @@ -2592,7 +2592,7 @@ "targets": [ { "exemplar": true, - "expr": "histogram_quantile(0.99, sum(rate(client_java_smart_raw_requests_latency_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, type))", + "expr": "histogram_quantile(0.99, sum(rate(client_java_smart_raw_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type))", "format": "time_series", "interval": "", "intervalFactor": 1, @@ -2742,7 +2742,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_smart_raw_requests_latency_sum{instance=~\"$instance\"}[$__rate_interval])) by (type) / sum(rate(client_java_smart_raw_requests_latency_count{instance=~\"$instance\"}[$__rate_interval])) by (type)", + "expr": "sum(rate(client_java_smart_raw_requests_latency_sum{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type) / sum(rate(client_java_smart_raw_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)", "format": "time_series", "hide": false, "interval": "", @@ -2853,7 +2853,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_smart_raw_circuit_breaker_opened_total{instance=~\"$instance\"}[$__rate_interval])) by (type)", + "expr": "sum(rate(client_java_smart_raw_circuit_breaker_opened_total{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)", "format": "time_series", "interval": "", "intervalFactor": 1, @@ -2954,7 +2954,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(client_java_circuit_breaker_attempt_counter_total{instance=~\"$instance\"}[$__rate_interval])) by (type)", + "expr": "sum(rate(client_java_circuit_breaker_attempt_counter_total{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)", "format": "time_series", "interval": "", "intervalFactor": 1, @@ -5151,6 +5151,37 @@ "tagsQuery": "", "type": "query", "useTags": false + }, + { + "allValue": ".*", + "current": { + "selected": true, + "text": "All", + "value": "$__all" + }, + "datasource": "${DS_TEST-CLUSTER}", + "definition": "label_values(client_java_raw_requests_latency_count, cluster)", + "description": null, + "error": null, + "hide": 0, + "includeAll": true, + "label": "cluster", + "multi": false, + "name": "cluster", + "options": [], + "query": { + "query": "label_values(client_java_raw_requests_latency_count, cluster)", + "refId": "StandardVariableQuery" + }, + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 0, + "tagValuesQuery": "", + "tags": [], + "tagsQuery": "", + "type": "query", + "useTags": false } ] }, diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index f13b6a66b8a..caa3cc99b1b 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -82,18 +82,16 @@ public RespT callWithRetry( if (logger.isTraceEnabled()) { logger.trace(String.format("Calling %s...", method.getFullMethodName())); } - RetryPolicy.Builder builder = new Builder<>(backOffer); + RetryPolicy policy = new Builder(backOffer).create(handler); RespT resp = - builder - .create(handler) - .callWithRetry( - () -> { - BlockingStubT stub = getBlockingStub(); - return ClientCalls.blockingUnaryCall( - stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()); - }, - method.getFullMethodName(), - backOffer); + policy.callWithRetry( + () -> { + BlockingStubT stub = getBlockingStub(); + return ClientCalls.blockingUnaryCall( + stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()); + }, + method.getFullMethodName(), + backOffer); if (logger.isTraceEnabled()) { logger.trace(String.format("leaving %s...", method.getFullMethodName())); @@ -109,20 +107,18 @@ protected void callAsyncWithRetry( ErrorHandler handler) { logger.debug(String.format("Calling %s...", method.getFullMethodName())); - RetryPolicy.Builder builder = new Builder<>(backOffer); - builder - .create(handler) - .callWithRetry( - () -> { - FutureStubT stub = getAsyncStub(); - ClientCalls.asyncUnaryCall( - stub.getChannel().newCall(method, stub.getCallOptions()), - requestFactory.get(), - responseObserver); - return null; - }, - method.getFullMethodName(), - backOffer); + RetryPolicy policy = new Builder(backOffer).create(handler); + policy.callWithRetry( + () -> { + FutureStubT stub = getAsyncStub(); + ClientCalls.asyncUnaryCall( + stub.getChannel().newCall(method, stub.getCallOptions()), + requestFactory.get(), + responseObserver); + return null; + }, + method.getFullMethodName(), + backOffer); logger.debug(String.format("leaving %s...", method.getFullMethodName())); } @@ -133,18 +129,17 @@ StreamObserver callBidiStreamingWithRetry( ErrorHandler> handler) { logger.debug(String.format("Calling %s...", method.getFullMethodName())); - RetryPolicy.Builder> builder = new Builder<>(backOffer); + RetryPolicy> policy = + new Builder>(backOffer).create(handler); StreamObserver observer = - builder - .create(handler) - .callWithRetry( - () -> { - FutureStubT stub = getAsyncStub(); - return asyncBidiStreamingCall( - stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver); - }, - method.getFullMethodName(), - backOffer); + policy.callWithRetry( + () -> { + FutureStubT stub = getAsyncStub(); + return asyncBidiStreamingCall( + stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver); + }, + method.getFullMethodName(), + backOffer); logger.debug(String.format("leaving %s...", method.getFullMethodName())); return observer; } @@ -156,19 +151,18 @@ public StreamingResponse callServerStreamingWithRetry( ErrorHandler handler) { logger.debug(String.format("Calling %s...", method.getFullMethodName())); - RetryPolicy.Builder builder = new Builder<>(backOffer); + RetryPolicy policy = + new Builder(backOffer).create(handler); StreamingResponse response = - builder - .create(handler) - .callWithRetry( - () -> { - BlockingStubT stub = getBlockingStub(); - return new StreamingResponse( - blockingServerStreamingCall( - stub.getChannel(), method, stub.getCallOptions(), requestFactory.get())); - }, - method.getFullMethodName(), - backOffer); + policy.callWithRetry( + () -> { + BlockingStubT stub = getBlockingStub(); + return new StreamingResponse( + blockingServerStreamingCall( + stub.getChannel(), method, stub.getCallOptions(), requestFactory.get())); + }, + method.getFullMethodName(), + backOffer); logger.debug(String.format("leaving %s...", method.getFullMethodName())); return response; } diff --git a/src/main/java/org/tikv/common/KVClient.java b/src/main/java/org/tikv/common/KVClient.java index 873f33f2fe2..6ae3a909771 100644 --- a/src/main/java/org/tikv/common/KVClient.java +++ b/src/main/java/org/tikv/common/KVClient.java @@ -65,7 +65,9 @@ public void close() {} * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist */ public ByteString get(ByteString key, long version) throws GrpcException { - BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); + BackOffer backOffer = + ConcreteBackOffer.newGetBackOff( + clientBuilder.getRegionManager().getPDClient().getClusterId()); while (true) { RegionStoreClient client = clientBuilder.build(key); try { diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 4b8a283c40c..80633fe2ca6 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -127,6 +127,7 @@ public class PDClient extends AbstractGRPCClient HistogramUtils.buildDuration() .name("client_java_pd_get_region_by_requests_latency") .help("pd getRegionByKey request latency.") + .labelNames("cluster") .register(); private PDClient(TiConfiguration conf, ChannelFactory channelFactory) { @@ -281,7 +282,7 @@ private GetOperatorResponse getOperator(long regionId) { () -> GetOperatorRequest.newBuilder().setHeader(header).setRegionId(regionId).build(); // get operator no need to handle error and no need back offer. return callWithRetry( - ConcreteBackOffer.newCustomBackOff(0), + ConcreteBackOffer.newCustomBackOff(0, getClusterId()), PDGrpc.getGetOperatorMethod(), request, new NoopHandler<>()); @@ -309,7 +310,8 @@ private boolean isScatterRegionFinish(GetOperatorResponse resp) { @Override public Pair getRegionByKey(BackOffer backOffer, ByteString key) { - Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer(); + Histogram.Timer requestTimer = + PD_GET_REGION_BY_KEY_REQUEST_LATENCY.labels(getClusterId().toString()).startTimer(); try { if (conf.isTxnKVMode()) { CodecDataOutput cdo = new CodecDataOutput(); @@ -841,7 +843,7 @@ private Metapb.Region decodeRegion(Metapb.Region region) { return builder.build(); } - public long getClusterId() { + public Long getClusterId() { return header.getClusterId(); } diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index c4bbfdbde7f..d5a7c3aea87 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -67,4 +67,6 @@ List scanRegions( List getAllStores(BackOffer backOffer); TiConfiguration.ReplicaRead getReplicaRead(); + + Long getClusterId(); } diff --git a/src/main/java/org/tikv/common/Snapshot.java b/src/main/java/org/tikv/common/Snapshot.java index 6f7acc7f4f4..7012bc749ec 100644 --- a/src/main/java/org/tikv/common/Snapshot.java +++ b/src/main/java/org/tikv/common/Snapshot.java @@ -80,7 +80,9 @@ public List batchGet(int backOffer, List kvPairList = client.batchGet( - ConcreteBackOffer.newCustomBackOff(backOffer), list, timestamp.getVersion()); + ConcreteBackOffer.newCustomBackOff(backOffer, session.getPDClient().getClusterId()), + list, + timestamp.getVersion()); return kvPairList .stream() .map( diff --git a/src/main/java/org/tikv/common/StoreVersion.java b/src/main/java/org/tikv/common/StoreVersion.java index c23ce496840..c15b5695398 100644 --- a/src/main/java/org/tikv/common/StoreVersion.java +++ b/src/main/java/org/tikv/common/StoreVersion.java @@ -62,7 +62,8 @@ public static int compareTo(String v0, String v1) { public static boolean minTiKVVersion(String version, PDClient pdClient) { StoreVersion storeVersion = new StoreVersion(version); - BackOffer bo = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); + BackOffer bo = + ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF, pdClient.getClusterId()); List storeList = pdClient .getAllStores(bo) diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 8dd1793cbca..61aa2456468 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -158,7 +158,7 @@ public TiSession(TiConfiguration conf) { if (conf.isWarmUpEnable() && conf.isRawKVMode()) { warmUp(); } - this.circuitBreaker = new CircuitBreakerImpl(conf); + this.circuitBreaker = new CircuitBreakerImpl(conf, client.getClusterId()); logger.info("TiSession initialized in " + conf.getKvMode() + " mode"); } @@ -179,7 +179,7 @@ private static VersionInfo getVersionInfo() { private synchronized void warmUp() { long warmUpStartTime = System.nanoTime(); - BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(); + BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(getPDClient().getClusterId()); try { // let JVM ClassLoader load gRPC error related classes // this operation may cost 100ms @@ -329,7 +329,8 @@ public TiConfiguration getConf() { public TiTimestamp getTimestamp() { checkIsClosed(); - return getPDClient().getTimestamp(ConcreteBackOffer.newTsoBackOff()); + return getPDClient() + .getTimestamp(ConcreteBackOffer.newTsoBackOff(getPDClient().getClusterId())); } public Snapshot createSnapshot() { @@ -586,13 +587,16 @@ public void splitRegionAndScatter( .stream() .map(k -> Key.toRawKey(k).toByteString()) .collect(Collectors.toList()), - ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS)); + ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS, getPDClient().getClusterId())); // scatter region for (Metapb.Region newRegion : newRegions) { try { getPDClient() - .scatterRegion(newRegion, ConcreteBackOffer.newCustomBackOff(scatterRegionBackoffMS)); + .scatterRegion( + newRegion, + ConcreteBackOffer.newCustomBackOff( + scatterRegionBackoffMS, getPDClient().getClusterId())); } catch (Exception e) { logger.warn(String.format("failed to scatter region: %d", newRegion.getId()), e); } @@ -609,7 +613,9 @@ public void splitRegionAndScatter( return; } getPDClient() - .waitScatterRegionFinish(newRegion, ConcreteBackOffer.newCustomBackOff((int) remainMS)); + .waitScatterRegionFinish( + newRegion, + ConcreteBackOffer.newCustomBackOff((int) remainMS, getPDClient().getClusterId())); } } else { logger.info("skip to wait scatter region finish"); diff --git a/src/main/java/org/tikv/common/importer/ImporterClient.java b/src/main/java/org/tikv/common/importer/ImporterClient.java index 054e85caa12..882880ab00c 100644 --- a/src/main/java/org/tikv/common/importer/ImporterClient.java +++ b/src/main/java/org/tikv/common/importer/ImporterClient.java @@ -259,7 +259,9 @@ private void ingest() throws GrpcException { } Object writeResponse = clientLeader.getWriteResponse(); - BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.INGEST_BACKOFF); + BackOffer backOffer = + ConcreteBackOffer.newCustomBackOff( + BackOffer.INGEST_BACKOFF, tiSession.getPDClient().getClusterId()); ingestWithRetry(writeResponse, backOffer); } diff --git a/src/main/java/org/tikv/common/importer/SwitchTiKVModeClient.java b/src/main/java/org/tikv/common/importer/SwitchTiKVModeClient.java index 78429c4b694..12bb3a065fe 100644 --- a/src/main/java/org/tikv/common/importer/SwitchTiKVModeClient.java +++ b/src/main/java/org/tikv/common/importer/SwitchTiKVModeClient.java @@ -73,7 +73,8 @@ private void switchTiKVToImportMode() { } private void doSwitchTiKVMode(ImportSstpb.SwitchMode mode) { - BackOffer bo = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); + BackOffer bo = + ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF, pdClient.getClusterId()); List allStores = pdClient.getAllStores(bo); for (Metapb.Store store : allStores) { ImporterStoreClient client = builder.build(new TiStore(store)); diff --git a/src/main/java/org/tikv/common/log/SlowLog.java b/src/main/java/org/tikv/common/log/SlowLog.java index ad137100c01..5d679a28019 100644 --- a/src/main/java/org/tikv/common/log/SlowLog.java +++ b/src/main/java/org/tikv/common/log/SlowLog.java @@ -36,5 +36,7 @@ default SlowLog withField(String key, Object value) { return withFields(ImmutableMap.of(key, value)); } + Object getField(String key); + void log(); } diff --git a/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java b/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java index ed4bbb1b834..0e65cc137af 100644 --- a/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java +++ b/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java @@ -47,6 +47,11 @@ public SlowLog withFields(Map fields) { return this; } + @Override + public Object getField(String key) { + return null; + } + @Override public void log() {} } diff --git a/src/main/java/org/tikv/common/log/SlowLogImpl.java b/src/main/java/org/tikv/common/log/SlowLogImpl.java index 0c75211263f..fbf38fad26b 100644 --- a/src/main/java/org/tikv/common/log/SlowLogImpl.java +++ b/src/main/java/org/tikv/common/log/SlowLogImpl.java @@ -92,6 +92,11 @@ public SlowLog withFields(Map fields) { return this; } + @Override + public Object getField(String key) { + return fields.get(key); + } + @Override public void log() { recordTime(); diff --git a/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java b/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java index c366079551f..8d1659b1d6d 100644 --- a/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java +++ b/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java @@ -88,7 +88,8 @@ private ByteString resolveCurrentLock(Kvrpcpb.KvPair current) { builder.getRegionManager().getRegionStorePairByKey(current.getKey()); TiRegion region = pair.first; TiStore store = pair.second; - BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); + BackOffer backOffer = + ConcreteBackOffer.newGetBackOff(builder.getRegionManager().getPDClient().getClusterId()); try (RegionStoreClient client = builder.build(region, store)) { return client.get(backOffer, current.getKey(), version); } catch (Exception e) { diff --git a/src/main/java/org/tikv/common/policy/RetryPolicy.java b/src/main/java/org/tikv/common/policy/RetryPolicy.java index 1d604155ebc..4dc05d95b84 100644 --- a/src/main/java/org/tikv/common/policy/RetryPolicy.java +++ b/src/main/java/org/tikv/common/policy/RetryPolicy.java @@ -35,19 +35,19 @@ public abstract class RetryPolicy { HistogramUtils.buildDuration() .name("client_java_grpc_single_requests_latency") .help("grpc request latency.") - .labelNames("type") + .labelNames("type", "cluster") .register(); public static final Histogram CALL_WITH_RETRY_DURATION = HistogramUtils.buildDuration() .name("client_java_call_with_retry_duration") .help("callWithRetry duration.") - .labelNames("type") + .labelNames("type", "cluster") .register(); public static final Counter GRPC_REQUEST_RETRY_NUM = Counter.build() .name("client_java_grpc_requests_retry_num") .help("grpc request retry num.") - .labelNames("type") + .labelNames("type", "cluster") .register(); // handles PD and TiKV's error. @@ -72,7 +72,8 @@ private void rethrowNotRecoverableException(Exception e) { } public RespT callWithRetry(Callable proc, String methodName, BackOffer backOffer) { - Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(methodName).startTimer(); + String[] labels = new String[] {methodName, backOffer.getClusterId().toString()}; + Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(labels).startTimer(); SlowLogSpan callWithRetrySlowLogSpan = backOffer.getSlowLog().start("callWithRetry"); callWithRetrySlowLogSpan.addProperty("method", methodName); try { @@ -80,8 +81,7 @@ public RespT callWithRetry(Callable proc, String methodName, BackOffer ba RespT result = null; try { // add single request duration histogram - Histogram.Timer requestTimer = - GRPC_SINGLE_REQUEST_LATENCY.labels(methodName).startTimer(); + Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.labels(labels).startTimer(); SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("gRPC"); slowLogSpan.addProperty("method", methodName); try { @@ -96,7 +96,7 @@ public RespT callWithRetry(Callable proc, String methodName, BackOffer ba backOffer.checkTimeout(); boolean retry = handler.handleRequestError(backOffer, e); if (retry) { - GRPC_REQUEST_RETRY_NUM.labels(methodName).inc(); + GRPC_REQUEST_RETRY_NUM.labels(labels).inc(); continue; } else { return result; @@ -107,7 +107,7 @@ public RespT callWithRetry(Callable proc, String methodName, BackOffer ba if (handler != null) { boolean retry = handler.handleResponseError(backOffer, result); if (retry) { - GRPC_REQUEST_RETRY_NUM.labels(methodName).inc(); + GRPC_REQUEST_RETRY_NUM.labels(labels).inc(); continue; } } diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 1dc6c321fc4..8bbaf91f599 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -55,12 +55,14 @@ public abstract class AbstractRegionStoreClient HistogramUtils.buildDuration() .name("client_java_seek_leader_store_duration") .help("seek leader store duration.") + .labelNames("cluster") .register(); public static final Histogram SEEK_PROXY_STORE_DURATION = HistogramUtils.buildDuration() .name("client_java_seek_proxy_store_duration") .help("seek proxy store duration.") + .labelNames("cluster") .register(); protected final RegionManager regionManager; @@ -202,7 +204,10 @@ private void updateClientStub() { } private Boolean seekLeaderStore(BackOffer backOffer) { - Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer(); + Histogram.Timer switchLeaderDurationTimer = + SEEK_LEADER_STORE_DURATION + .labels(regionManager.getPDClient().getClusterId().toString()) + .startTimer(); SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekLeaderStore"); try { List peers = region.getFollowerList(); @@ -251,7 +256,10 @@ private Boolean seekLeaderStore(BackOffer backOffer) { private boolean seekProxyStore(BackOffer backOffer) { SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekProxyStore"); - Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer(); + Histogram.Timer grpcForwardDurationTimer = + SEEK_PROXY_STORE_DURATION + .labels(regionManager.getPDClient().getClusterId().toString()) + .startTimer(); try { logger.info(String.format("try grpc forward: region[%d]", region.getId())); // when current leader cannot be reached diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 75a23f30a88..8c1624e42e0 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -52,11 +52,13 @@ public class RegionManager { HistogramUtils.buildDuration() .name("client_java_get_region_by_requests_latency") .help("getRegionByKey request latency.") + .labelNames("cluster") .register(); public static final Histogram SCAN_REGIONS_REQUEST_LATENCY = HistogramUtils.buildDuration() .name("client_java_scan_regions_request_latency") .help("scanRegions request latency.") + .labelNames("cluster") .register(); // TODO: the region cache logic need rewrite. @@ -105,7 +107,9 @@ public void invalidateAll() { public List scanRegions( BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) { - Histogram.Timer requestTimer = SCAN_REGIONS_REQUEST_LATENCY.startTimer(); + Long clusterId = pdClient.getClusterId(); + Histogram.Timer requestTimer = + SCAN_REGIONS_REQUEST_LATENCY.labels(clusterId.toString()).startTimer(); SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("scanRegions"); try { return pdClient.scanRegions(backOffer, startKey, endKey, limit); @@ -122,7 +126,9 @@ public TiRegion getRegionByKey(ByteString key) { } public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { - Histogram.Timer requestTimer = GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer(); + Long clusterId = pdClient.getClusterId(); + Histogram.Timer requestTimer = + GET_REGION_BY_KEY_REQUEST_LATENCY.labels(clusterId.toString()).startTimer(); SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("getRegionByKey"); TiRegion region = cache.getRegionByKey(key, backOffer); try { @@ -316,6 +322,7 @@ public void insertRegionToCache(TiRegion region) { } private BackOffer defaultBackOff() { - return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS()); + return ConcreteBackOffer.newCustomBackOff( + conf.getRawKVDefaultBackoffInMS(), pdClient.getClusterId()); } } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 1c090a92a0f..66daeaa828c 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -18,7 +18,9 @@ package org.tikv.common.region; import static org.tikv.common.region.RegionStoreClient.RequestTypes.REQ_TYPE_DAG; -import static org.tikv.common.util.BackOffFunction.BackOffFuncType.*; +import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoRegionMiss; +import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoTxnLock; +import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoTxnLockFast; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; @@ -29,7 +31,17 @@ import io.grpc.Metadata; import io.grpc.stub.MetadataUtils; import io.prometheus.client.Histogram; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,15 +49,64 @@ import org.tikv.common.StoreVersion; import org.tikv.common.TiConfiguration; import org.tikv.common.Version; -import org.tikv.common.exception.*; +import org.tikv.common.exception.GrpcException; +import org.tikv.common.exception.KeyException; +import org.tikv.common.exception.RawCASConflictException; +import org.tikv.common.exception.RegionException; +import org.tikv.common.exception.SelectException; +import org.tikv.common.exception.TiClientInternalException; +import org.tikv.common.exception.TiKVException; import org.tikv.common.log.SlowLogEmptyImpl; import org.tikv.common.operation.KVErrorHandler; import org.tikv.common.operation.RegionErrorHandler; import org.tikv.common.streaming.StreamingResponse; -import org.tikv.common.util.*; +import org.tikv.common.util.BackOffFunction; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.Batch; +import org.tikv.common.util.ChannelFactory; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.common.util.HistogramUtils; +import org.tikv.common.util.Pair; +import org.tikv.common.util.RangeSplitter; import org.tikv.kvproto.Coprocessor; import org.tikv.kvproto.Errorpb; -import org.tikv.kvproto.Kvrpcpb.*; +import org.tikv.kvproto.Kvrpcpb.BatchGetRequest; +import org.tikv.kvproto.Kvrpcpb.BatchGetResponse; +import org.tikv.kvproto.Kvrpcpb.CommitRequest; +import org.tikv.kvproto.Kvrpcpb.CommitResponse; +import org.tikv.kvproto.Kvrpcpb.GetRequest; +import org.tikv.kvproto.Kvrpcpb.GetResponse; +import org.tikv.kvproto.Kvrpcpb.KeyError; +import org.tikv.kvproto.Kvrpcpb.KvPair; +import org.tikv.kvproto.Kvrpcpb.Mutation; +import org.tikv.kvproto.Kvrpcpb.PrewriteRequest; +import org.tikv.kvproto.Kvrpcpb.PrewriteResponse; +import org.tikv.kvproto.Kvrpcpb.RawBatchDeleteRequest; +import org.tikv.kvproto.Kvrpcpb.RawBatchDeleteResponse; +import org.tikv.kvproto.Kvrpcpb.RawBatchGetRequest; +import org.tikv.kvproto.Kvrpcpb.RawBatchGetResponse; +import org.tikv.kvproto.Kvrpcpb.RawBatchPutRequest; +import org.tikv.kvproto.Kvrpcpb.RawBatchPutResponse; +import org.tikv.kvproto.Kvrpcpb.RawCASRequest; +import org.tikv.kvproto.Kvrpcpb.RawCASResponse; +import org.tikv.kvproto.Kvrpcpb.RawDeleteRangeRequest; +import org.tikv.kvproto.Kvrpcpb.RawDeleteRangeResponse; +import org.tikv.kvproto.Kvrpcpb.RawDeleteRequest; +import org.tikv.kvproto.Kvrpcpb.RawDeleteResponse; +import org.tikv.kvproto.Kvrpcpb.RawGetKeyTTLRequest; +import org.tikv.kvproto.Kvrpcpb.RawGetKeyTTLResponse; +import org.tikv.kvproto.Kvrpcpb.RawGetRequest; +import org.tikv.kvproto.Kvrpcpb.RawGetResponse; +import org.tikv.kvproto.Kvrpcpb.RawPutRequest; +import org.tikv.kvproto.Kvrpcpb.RawPutResponse; +import org.tikv.kvproto.Kvrpcpb.RawScanRequest; +import org.tikv.kvproto.Kvrpcpb.RawScanResponse; +import org.tikv.kvproto.Kvrpcpb.ScanRequest; +import org.tikv.kvproto.Kvrpcpb.ScanResponse; +import org.tikv.kvproto.Kvrpcpb.SplitRegionRequest; +import org.tikv.kvproto.Kvrpcpb.SplitRegionResponse; +import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatRequest; +import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatResponse; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; @@ -78,7 +139,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { HistogramUtils.buildDuration() .name("client_java_grpc_raw_requests_latency") .help("grpc raw request latency.") - .labelNames("type") + .labelNames("type", "cluster") .register(); private synchronized Boolean getIsV4() { @@ -742,7 +803,7 @@ public Iterator coprocessStreaming( StreamingResponse responseIterator = this.callServerStreamingWithRetry( - ConcreteBackOffer.newCopNextMaxBackOff(), + ConcreteBackOffer.newCopNextMaxBackOff(pdClient.getClusterId()), TikvGrpc.getCoprocessorStreamMethod(), reqToSend, handler); @@ -778,7 +839,10 @@ public List splitRegion(Iterable splitKeys) { SplitRegionResponse resp = callWithRetry( - ConcreteBackOffer.newGetBackOff(), TikvGrpc.getSplitRegionMethod(), request, handler); + ConcreteBackOffer.newGetBackOff(pdClient.getClusterId()), + TikvGrpc.getSplitRegionMethod(), + request, + handler); if (resp == null) { this.regionManager.onRequestFail(region); @@ -798,8 +862,9 @@ public List splitRegion(Iterable splitKeys) { // APIs for Raw Scan/Put/Get/Delete public Optional rawGet(BackOffer backOffer, ByteString key) { + Long clusterId = pdClient.getClusterId(); Histogram.Timer requestTimer = - GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get").startTimer(); + GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get", clusterId.toString()).startTimer(); try { Supplier factory = () -> @@ -837,8 +902,11 @@ private Optional rawGetHelper(RawGetResponse resp) { } public Optional rawGetKeyTTL(BackOffer backOffer, ByteString key) { + Long clusterId = pdClient.getClusterId(); Histogram.Timer requestTimer = - GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get_key_ttl").startTimer(); + GRPC_RAW_REQUEST_LATENCY + .labels("client_grpc_raw_get_key_ttl", clusterId.toString()) + .startTimer(); try { Supplier factory = () -> @@ -876,8 +944,11 @@ private Optional rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) { } public void rawDelete(BackOffer backOffer, ByteString key, boolean atomicForCAS) { + Long clusterId = pdClient.getClusterId(); Histogram.Timer requestTimer = - GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_delete").startTimer(); + GRPC_RAW_REQUEST_LATENCY + .labels("client_grpc_raw_delete", clusterId.toString()) + .startTimer(); try { Supplier factory = () -> @@ -914,8 +985,9 @@ private void rawDeleteHelper(RawDeleteResponse resp, TiRegion region) { public void rawPut( BackOffer backOffer, ByteString key, ByteString value, long ttl, boolean atomicForCAS) { + Long clusterId = pdClient.getClusterId(); Histogram.Timer requestTimer = - GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put").startTimer(); + GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put", clusterId.toString()).startTimer(); try { Supplier factory = () -> @@ -958,8 +1030,11 @@ public void rawCompareAndSet( ByteString value, long ttl) throws RawCASConflictException { + Long clusterId = pdClient.getClusterId(); Histogram.Timer requestTimer = - GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put_if_absent").startTimer(); + GRPC_RAW_REQUEST_LATENCY + .labels("client_grpc_raw_put_if_absent", clusterId.toString()) + .startTimer(); try { Supplier factory = () -> @@ -1008,8 +1083,11 @@ private void rawCompareAndSetHelper( } public List rawBatchGet(BackOffer backoffer, List keys) { + Long clusterId = pdClient.getClusterId(); Histogram.Timer requestTimer = - GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_batch_get").startTimer(); + GRPC_RAW_REQUEST_LATENCY + .labels("client_grpc_raw_batch_get", clusterId.toString()) + .startTimer(); try { if (keys.isEmpty()) { return new ArrayList<>(); @@ -1044,8 +1122,11 @@ private List handleRawBatchGet(RawBatchGetResponse resp) { public void rawBatchPut( BackOffer backOffer, List kvPairs, long ttl, boolean atomicForCAS) { + Long clusterId = pdClient.getClusterId(); Histogram.Timer requestTimer = - GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_batch_put").startTimer(); + GRPC_RAW_REQUEST_LATENCY + .labels("client_grpc_raw_batch_put", clusterId.toString()) + .startTimer(); try { if (kvPairs.isEmpty()) { return; @@ -1097,8 +1178,11 @@ private void handleRawBatchPut(RawBatchPutResponse resp) { } public void rawBatchDelete(BackOffer backoffer, List keys, boolean atomicForCAS) { + Long clusterId = pdClient.getClusterId(); Histogram.Timer requestTimer = - GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_batch_delete").startTimer(); + GRPC_RAW_REQUEST_LATENCY + .labels("client_grpc_raw_batch_delete", clusterId.toString()) + .startTimer(); try { if (keys.isEmpty()) { return; @@ -1145,8 +1229,9 @@ private void handleRawBatchDelete(RawBatchDeleteResponse resp) { * @return KvPair list */ public List rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) { + Long clusterId = pdClient.getClusterId(); Histogram.Timer requestTimer = - GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_scan").startTimer(); + GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_scan", clusterId.toString()).startTimer(); try { Supplier factory = () -> @@ -1191,8 +1276,11 @@ private List rawScanHelper(RawScanResponse resp) { * @param endKey endKey */ public void rawDeleteRange(BackOffer backOffer, ByteString startKey, ByteString endKey) { + Long clusterId = pdClient.getClusterId(); Histogram.Timer requestTimer = - GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_delete_range").startTimer(); + GRPC_RAW_REQUEST_LATENCY + .labels("client_grpc_raw_delete_range", clusterId.toString()) + .startTimer(); try { Supplier factory = () -> @@ -1349,7 +1437,10 @@ public RegionManager getRegionManager() { } private BackOffer defaultBackOff() { - return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS()); + BackOffer backoffer = + ConcreteBackOffer.newCustomBackOff( + conf.getRawKVDefaultBackoffInMS(), pdClient.getClusterId()); + return backoffer; } } } diff --git a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java index 56c953d2961..c22186299e4 100644 --- a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java +++ b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java @@ -89,7 +89,9 @@ private boolean checkStoreHealth(TiStore store) { private boolean checkStoreTombstone(TiStore store) { try { - Metapb.Store newStore = pdClient.getStore(ConcreteBackOffer.newRawKVBackOff(), store.getId()); + Metapb.Store newStore = + pdClient.getStore( + ConcreteBackOffer.newRawKVBackOff(pdClient.getClusterId()), store.getId()); if (newStore != null && newStore.getState() == Metapb.StoreState.Tombstone) { return true; } diff --git a/src/main/java/org/tikv/common/util/BackOffer.java b/src/main/java/org/tikv/common/util/BackOffer.java index 911cf35ad63..9baf41397c7 100644 --- a/src/main/java/org/tikv/common/util/BackOffer.java +++ b/src/main/java/org/tikv/common/util/BackOffer.java @@ -70,4 +70,6 @@ enum BackOffStrategy { } SlowLog getSlowLog(); + + Long getClusterId(); } diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index a874b477126..ac00d087fec 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -39,6 +39,7 @@ public class ConcreteBackOffer implements BackOffer { private static final Logger logger = LoggerFactory.getLogger(ConcreteBackOffer.class); private final int maxSleep; + private final Long clusterId; @VisibleForTesting public final Map backOffFunctionMap; @@ -52,14 +53,15 @@ public class ConcreteBackOffer implements BackOffer { HistogramUtils.buildDuration() .name("client_java_backoff_duration") .help("backoff duration.") - .labelNames("type") + .labelNames("type", "cluster") .register(); - private ConcreteBackOffer(int maxSleep, long deadline, SlowLog slowLog) { + private ConcreteBackOffer(int maxSleep, long deadline, SlowLog slowLog, long clusterId) { Preconditions.checkArgument( maxSleep == 0 || deadline == 0, "Max sleep time should be 0 or Deadline should be 0."); Preconditions.checkArgument(maxSleep >= 0, "Max sleep time cannot be less than 0."); Preconditions.checkArgument(deadline >= 0, "Deadline cannot be less than 0."); + this.clusterId = clusterId; this.maxSleep = maxSleep; this.errors = Collections.synchronizedList(new ArrayList<>()); this.backOffFunctionMap = new ConcurrentHashMap<>(); @@ -68,6 +70,7 @@ private ConcreteBackOffer(int maxSleep, long deadline, SlowLog slowLog) { } private ConcreteBackOffer(ConcreteBackOffer source) { + this.clusterId = source.clusterId; this.maxSleep = source.maxSleep; this.totalSleep = source.totalSleep; this.errors = source.errors; @@ -76,37 +79,54 @@ private ConcreteBackOffer(ConcreteBackOffer source) { this.slowLog = source.slowLog; } - public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs, SlowLog slowLog) { + public static ConcreteBackOffer newDeadlineBackOff( + int timeoutInMs, SlowLog slowLog, long clusterId) { long deadline = System.currentTimeMillis() + timeoutInMs; - return new ConcreteBackOffer(0, deadline, slowLog); + return new ConcreteBackOffer(0, deadline, slowLog, clusterId); + } + + public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs, SlowLog slowLog) { + return newDeadlineBackOff(timeoutInMs, slowLog, 0); + } + + public static ConcreteBackOffer newCustomBackOff(int maxSleep, long clusterId) { + return new ConcreteBackOffer(maxSleep, 0, SlowLogEmptyImpl.INSTANCE, clusterId); } public static ConcreteBackOffer newCustomBackOff(int maxSleep) { - return new ConcreteBackOffer(maxSleep, 0, SlowLogEmptyImpl.INSTANCE); + return newCustomBackOff(maxSleep, 0); } public static ConcreteBackOffer newScannerNextMaxBackOff() { - return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); + return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, 0); } public static ConcreteBackOffer newBatchGetMaxBackOff() { - return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); + return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, 0); } public static ConcreteBackOffer newCopNextMaxBackOff() { - return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); + return newCopNextMaxBackOff(0); } - public static ConcreteBackOffer newGetBackOff() { - return new ConcreteBackOffer(GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); + public static ConcreteBackOffer newCopNextMaxBackOff(long clusterId) { + return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId); + } + + public static ConcreteBackOffer newGetBackOff(long clusterId) { + return new ConcreteBackOffer(GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId); + } + + public static ConcreteBackOffer newRawKVBackOff(long clusterId) { + return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId); } public static ConcreteBackOffer newRawKVBackOff() { - return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); + return newRawKVBackOff(0); } - public static ConcreteBackOffer newTsoBackOff() { - return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); + public static ConcreteBackOffer newTsoBackOff(long clusterId) { + return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId); } public static ConcreteBackOffer create(BackOffer source) { @@ -173,7 +193,8 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) { } public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) { - Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer(); + String[] labels = new String[] {funcType.name(), clusterId.toString()}; + Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(labels).startTimer(); SlowLogSpan slowLogSpan = getSlowLog().start("backoff"); slowLogSpan.addProperty("type", funcType.name()); BackOffFunction backOffFunction = @@ -239,4 +260,8 @@ private void logThrowError(Exception err) { public SlowLog getSlowLog() { return slowLog; } + + public Long getClusterId() { + return clusterId; + } } diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index d4bbeebef78..45fd0699ee9 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -73,7 +73,7 @@ import org.tikv.kvproto.Kvrpcpb.KvPair; public class RawKVClient implements RawKVClientBase { - private final long clusterId; + private final Long clusterId; private final List pdAddresses; private final TiSession tiSession; private final RegionStoreClientBuilder clientBuilder; @@ -90,21 +90,21 @@ public class RawKVClient implements RawKVClientBase { HistogramUtils.buildDuration() .name("client_java_raw_requests_latency") .help("client raw request latency.") - .labelNames("type") + .labelNames("type", "cluster") .register(); public static final Counter RAW_REQUEST_SUCCESS = Counter.build() .name("client_java_raw_requests_success") .help("client raw request success.") - .labelNames("type") + .labelNames("type", "cluster") .register(); public static final Counter RAW_REQUEST_FAILURE = Counter.build() .name("client_java_raw_requests_failure") .help("client raw request failure.") - .labelNames("type") + .labelNames("type", "cluster") .register(); private static final TiKVException ERR_MAX_SCAN_LIMIT_EXCEEDED = @@ -130,6 +130,10 @@ private SlowLog withClusterInfo(SlowLog logger) { return logger.withField("cluster_id", clusterId).withField("pd_addresses", pdAddresses); } + private String[] withClusterId(String label) { + return new String[] {label, clusterId.toString()}; + } + @Override public void close() {} @@ -140,21 +144,21 @@ public void put(ByteString key, ByteString value) { @Override public void put(ByteString key, ByteString value, long ttl) { - String label = "client_raw_put"; - Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + String[] labels = withClusterId("client_raw_put"); + Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer(); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS())); SlowLogSpan span = slowLog.start("put"); span.addProperty("key", KeyUtils.formatBytesUTF8(key)); ConcreteBackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog, clusterId); try { while (true) { try (RegionStoreClient client = clientBuilder.build(key, backOffer)) { span.addProperty("region", client.getRegion().toString()); client.rawPut(backOffer, key, value, ttl, atomicForCAS); - RAW_REQUEST_SUCCESS.labels(label).inc(); + RAW_REQUEST_SUCCESS.labels(labels).inc(); return; } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); @@ -162,7 +166,7 @@ public void put(ByteString key, ByteString value, long ttl) { } } } catch (Exception e) { - RAW_REQUEST_FAILURE.labels(label).inc(); + RAW_REQUEST_FAILURE.labels(labels).inc(); slowLog.setError(e); throw e; } finally { @@ -202,21 +206,21 @@ public void compareAndSet( "To use compareAndSet or putIfAbsent, please enable the config tikv.enable_atomic_for_cas."); } - String label = "client_raw_compare_and_set"; - Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + String[] labels = withClusterId("client_raw_compare_and_set"); + Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer(); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS())); SlowLogSpan span = slowLog.start("putIfAbsent"); span.addProperty("key", KeyUtils.formatBytesUTF8(key)); ConcreteBackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog, clusterId); try { while (true) { try (RegionStoreClient client = clientBuilder.build(key, backOffer)) { span.addProperty("region", client.getRegion().toString()); client.rawCompareAndSet(backOffer, key, prevValue, value, ttl); - RAW_REQUEST_SUCCESS.labels(label).inc(); + RAW_REQUEST_SUCCESS.labels(labels).inc(); return; } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); @@ -224,7 +228,7 @@ public void compareAndSet( } } } catch (Exception e) { - RAW_REQUEST_FAILURE.labels(label).inc(); + RAW_REQUEST_FAILURE.labels(labels).inc(); slowLog.setError(e); throw e; } finally { @@ -241,21 +245,22 @@ public void batchPut(Map kvPairs) { @Override public void batchPut(Map kvPairs, long ttl) { - String label = "client_raw_batch_put"; - Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + String[] labels = withClusterId("client_raw_batch_put"); + Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer(); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS())); SlowLogSpan span = slowLog.start("batchPut"); span.addProperty("keySize", String.valueOf(kvPairs.size())); ConcreteBackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS(), slowLog); + ConcreteBackOffer.newDeadlineBackOff( + conf.getRawKVBatchWriteTimeoutInMS(), slowLog, clusterId); try { long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS(); doSendBatchPut(backOffer, kvPairs, ttl, deadline); - RAW_REQUEST_SUCCESS.labels(label).inc(); + RAW_REQUEST_SUCCESS.labels(labels).inc(); } catch (Exception e) { - RAW_REQUEST_FAILURE.labels(label).inc(); + RAW_REQUEST_FAILURE.labels(labels).inc(); slowLog.setError(e); throw e; } finally { @@ -267,21 +272,21 @@ public void batchPut(Map kvPairs, long ttl) { @Override public Optional get(ByteString key) { - String label = "client_raw_get"; - Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + String[] labels = withClusterId("client_raw_get"); + Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer(); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS())); SlowLogSpan span = slowLog.start("get"); span.addProperty("key", KeyUtils.formatBytesUTF8(key)); ConcreteBackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog); + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog, clusterId); try { while (true) { try (RegionStoreClient client = clientBuilder.build(key, backOffer)) { span.addProperty("region", client.getRegion().toString()); Optional result = client.rawGet(backOffer, key); - RAW_REQUEST_SUCCESS.labels(label).inc(); + RAW_REQUEST_SUCCESS.labels(labels).inc(); return result; } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); @@ -289,7 +294,7 @@ public Optional get(ByteString key) { } } } catch (Exception e) { - RAW_REQUEST_FAILURE.labels(label).inc(); + RAW_REQUEST_FAILURE.labels(labels).inc(); slowLog.setError(e); throw e; } finally { @@ -301,20 +306,21 @@ public Optional get(ByteString key) { @Override public List batchGet(List keys) { - String label = "client_raw_batch_get"; - Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + String[] labels = withClusterId("client_raw_batch_get"); + Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer(); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchReadSlowLogInMS())); SlowLogSpan span = slowLog.start("batchGet"); span.addProperty("keySize", String.valueOf(keys.size())); ConcreteBackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchReadTimeoutInMS(), slowLog); + ConcreteBackOffer.newDeadlineBackOff( + conf.getRawKVBatchReadTimeoutInMS(), slowLog, clusterId); try { long deadline = System.currentTimeMillis() + conf.getRawKVBatchReadTimeoutInMS(); List result = doSendBatchGet(backOffer, keys, deadline); - RAW_REQUEST_SUCCESS.labels(label).inc(); + RAW_REQUEST_SUCCESS.labels(labels).inc(); return result; } catch (Exception e) { - RAW_REQUEST_FAILURE.labels(label).inc(); + RAW_REQUEST_FAILURE.labels(labels).inc(); slowLog.setError(e); throw e; } finally { @@ -326,20 +332,20 @@ public List batchGet(List keys) { @Override public void batchDelete(List keys) { - String label = "client_raw_batch_delete"; - Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + String[] labels = withClusterId("client_raw_batch_delete"); + Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer(); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS())); SlowLogSpan span = slowLog.start("batchDelete"); span.addProperty("keySize", String.valueOf(keys.size())); ConcreteBackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS(), slowLog); + ConcreteBackOffer.newDeadlineBackOff( + conf.getRawKVBatchWriteTimeoutInMS(), slowLog, clusterId); try { long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS(); doSendBatchDelete(backOffer, keys, deadline); - RAW_REQUEST_SUCCESS.labels(label).inc(); - return; + RAW_REQUEST_SUCCESS.labels(labels).inc(); } catch (Exception e) { - RAW_REQUEST_FAILURE.labels(label).inc(); + RAW_REQUEST_FAILURE.labels(labels).inc(); slowLog.setError(e); throw e; } finally { @@ -351,19 +357,19 @@ public void batchDelete(List keys) { @Override public Optional getKeyTTL(ByteString key) { - String label = "client_raw_get_key_ttl"; - Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + String[] labels = withClusterId("client_raw_get_key_ttl"); + Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer(); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS())); SlowLogSpan span = slowLog.start("getKeyTTL"); span.addProperty("key", KeyUtils.formatBytesUTF8(key)); ConcreteBackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog); + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog, clusterId); try { while (true) { try (RegionStoreClient client = clientBuilder.build(key, backOffer)) { span.addProperty("region", client.getRegion().toString()); Optional result = client.rawGetKeyTTL(backOffer, key); - RAW_REQUEST_SUCCESS.labels(label).inc(); + RAW_REQUEST_SUCCESS.labels(labels).inc(); return result; } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); @@ -371,7 +377,7 @@ public Optional getKeyTTL(ByteString key) { } } } catch (Exception e) { - RAW_REQUEST_FAILURE.labels(label).inc(); + RAW_REQUEST_FAILURE.labels(labels).inc(); slowLog.setError(e); throw e; } finally { @@ -403,8 +409,8 @@ public List> batchScanKeys( @Override public List> batchScan(List ranges) { - String label = "client_raw_batch_scan"; - Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + String[] labels = withClusterId("client_raw_batch_scan"); + Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer(); long deadline = System.currentTimeMillis() + conf.getRawKVScanTimeoutInMS(); List>>> futureList = new ArrayList<>(); try { @@ -439,10 +445,10 @@ public List> batchScan(List ranges) { throw new TiKVException("Execution exception met.", e); } } - RAW_REQUEST_SUCCESS.labels(label).inc(); + RAW_REQUEST_SUCCESS.labels(labels).inc(); return scanResults; } catch (Exception e) { - RAW_REQUEST_FAILURE.labels(label).inc(); + RAW_REQUEST_FAILURE.labels(labels).inc(); for (Future>> future : futureList) { future.cancel(true); } @@ -459,8 +465,8 @@ public List scan(ByteString startKey, ByteString endKey, int limit) { @Override public List scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { - String label = "client_raw_scan"; - Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + String[] labels = withClusterId("client_raw_scan"); + Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer(); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS())); SlowLogSpan span = slowLog.start("scan"); span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey)); @@ -468,16 +474,16 @@ public List scan(ByteString startKey, ByteString endKey, int limit, bool span.addProperty("limit", String.valueOf(limit)); span.addProperty("keyOnly", String.valueOf(keyOnly)); ConcreteBackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog); + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog, clusterId); try { Iterator iterator = rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, backOffer); List result = new ArrayList<>(); iterator.forEachRemaining(result::add); - RAW_REQUEST_SUCCESS.labels(label).inc(); + RAW_REQUEST_SUCCESS.labels(labels).inc(); return result; } catch (Exception e) { - RAW_REQUEST_FAILURE.labels(label).inc(); + RAW_REQUEST_FAILURE.labels(labels).inc(); slowLog.setError(e); throw e; } finally { @@ -504,15 +510,15 @@ public List scan(ByteString startKey, ByteString endKey) { @Override public List scan(ByteString startKey, ByteString endKey, boolean keyOnly) { - String label = "client_raw_scan_without_limit"; - Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + String[] labels = withClusterId("client_raw_scan_without_limit"); + Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer(); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS())); SlowLogSpan span = slowLog.start("scan"); span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey)); span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey)); span.addProperty("keyOnly", String.valueOf(keyOnly)); ConcreteBackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog); + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog, clusterId); try { ByteString newStartKey = startKey; List result = new ArrayList<>(); @@ -532,10 +538,10 @@ public List scan(ByteString startKey, ByteString endKey, boolean keyOnly iterator.forEachRemaining(result::add); newStartKey = Key.toRawKey(result.get(result.size() - 1).getKey()).next().toByteString(); } - RAW_REQUEST_SUCCESS.labels(label).inc(); + RAW_REQUEST_SUCCESS.labels(labels).inc(); return result; } catch (Exception e) { - RAW_REQUEST_FAILURE.labels(label).inc(); + RAW_REQUEST_FAILURE.labels(labels).inc(); slowLog.setError(e); throw e; } finally { @@ -570,19 +576,19 @@ public List scanPrefix(ByteString prefixKey, boolean keyOnly) { @Override public void delete(ByteString key) { - String label = "client_raw_delete"; - Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + String[] labels = withClusterId("client_raw_delete"); + Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer(); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS())); SlowLogSpan span = slowLog.start("delete"); span.addProperty("key", KeyUtils.formatBytesUTF8(key)); ConcreteBackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog, clusterId); try { while (true) { try (RegionStoreClient client = clientBuilder.build(key, backOffer)) { span.addProperty("region", client.getRegion().toString()); client.rawDelete(backOffer, key, atomicForCAS); - RAW_REQUEST_SUCCESS.labels(label).inc(); + RAW_REQUEST_SUCCESS.labels(labels).inc(); return; } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); @@ -590,7 +596,7 @@ public void delete(ByteString key) { } } } catch (Exception e) { - RAW_REQUEST_FAILURE.labels(label).inc(); + RAW_REQUEST_FAILURE.labels(labels).inc(); slowLog.setError(e); throw e; } finally { @@ -602,17 +608,17 @@ public void delete(ByteString key) { @Override public synchronized void deleteRange(ByteString startKey, ByteString endKey) { - String label = "client_raw_delete_range"; - Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + String[] labels = withClusterId("client_raw_delete_range"); + Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer(); ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff( - conf.getRawKVCleanTimeoutInMS(), SlowLogEmptyImpl.INSTANCE); + conf.getRawKVCleanTimeoutInMS(), SlowLogEmptyImpl.INSTANCE, clusterId); try { long deadline = System.currentTimeMillis() + conf.getRawKVCleanTimeoutInMS(); doSendDeleteRange(backOffer, startKey, endKey, deadline); - RAW_REQUEST_SUCCESS.labels(label).inc(); + RAW_REQUEST_SUCCESS.labels(labels).inc(); } catch (Exception e) { - RAW_REQUEST_FAILURE.labels(label).inc(); + RAW_REQUEST_FAILURE.labels(labels).inc(); throw e; } finally { requestTimer.observeDuration(); @@ -625,6 +631,11 @@ public synchronized void deletePrefix(ByteString key) { deleteRange(key, endKey); } + @Override + public TiSession getSession() { + return tiSession; + } + /** * Ingest KV pairs to RawKV using StreamKV API. * @@ -1048,15 +1059,15 @@ public Iterator scan0(ByteString startKey, int limit, boolean keyOnly) { */ public Iterator scan0( ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { - String label = "client_raw_scan"; - Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + String[] labels = withClusterId("client_raw_scan"); + Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer(); try { Iterator iterator = rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, defaultBackOff()); - RAW_REQUEST_SUCCESS.labels(label).inc(); + RAW_REQUEST_SUCCESS.labels(labels).inc(); return iterator; } catch (Exception e) { - RAW_REQUEST_FAILURE.labels(label).inc(); + RAW_REQUEST_FAILURE.labels(labels).inc(); throw e; } finally { requestTimer.observeDuration(); @@ -1171,6 +1182,6 @@ public KvPair next() { } private BackOffer defaultBackOff() { - return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS()); + return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS(), clusterId); } } diff --git a/src/main/java/org/tikv/raw/RawKVClientBase.java b/src/main/java/org/tikv/raw/RawKVClientBase.java index bc49dba8bc6..74eacc854cc 100644 --- a/src/main/java/org/tikv/raw/RawKVClientBase.java +++ b/src/main/java/org/tikv/raw/RawKVClientBase.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import org.tikv.common.TiSession; import org.tikv.common.util.Pair; import org.tikv.common.util.ScanOption; import org.tikv.kvproto.Kvrpcpb; @@ -271,4 +272,7 @@ public interface RawKVClientBase extends AutoCloseable { * @param key prefix of keys to be deleted */ void deletePrefix(ByteString key); + + /** Get the session of the current client */ + TiSession getSession(); } diff --git a/src/main/java/org/tikv/raw/SmartRawKVClient.java b/src/main/java/org/tikv/raw/SmartRawKVClient.java index c8ea8fbe2d5..429a3735048 100644 --- a/src/main/java/org/tikv/raw/SmartRawKVClient.java +++ b/src/main/java/org/tikv/raw/SmartRawKVClient.java @@ -25,6 +25,7 @@ import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.tikv.common.TiSession; import org.tikv.common.exception.CircuitBreakerOpenException; import org.tikv.common.util.HistogramUtils; import org.tikv.common.util.Pair; @@ -39,28 +40,28 @@ public class SmartRawKVClient implements RawKVClientBase { HistogramUtils.buildDuration() .name("client_java_smart_raw_requests_latency") .help("client smart raw request latency.") - .labelNames("type") + .labelNames("type", "cluster") .register(); private static final Counter REQUEST_SUCCESS = Counter.build() .name("client_java_smart_raw_requests_success") .help("client smart raw request success.") - .labelNames("type") + .labelNames("type", "cluster") .register(); private static final Counter REQUEST_FAILURE = Counter.build() .name("client_java_smart_raw_requests_failure") .help("client smart raw request failure.") - .labelNames("type") + .labelNames("type", "cluster") .register(); private static final Counter CIRCUIT_BREAKER_OPENED = Counter.build() .name("client_java_smart_raw_circuit_breaker_opened") .help("client smart raw circuit breaker opened.") - .labelNames("type") + .labelNames("type", "cluster") .register(); private final RawKVClientBase client; @@ -204,14 +205,22 @@ public void deletePrefix(ByteString key) { callWithCircuitBreaker("deletePrefix", () -> client.deletePrefix(key)); } + @Override + public TiSession getSession() { + return client.getSession(); + } + T callWithCircuitBreaker(String funcName, Function1 func) { - Histogram.Timer requestTimer = REQUEST_LATENCY.labels(funcName).startTimer(); + String[] labels = + new String[] {funcName, client.getSession().getPDClient().getClusterId().toString()}; + + Histogram.Timer requestTimer = REQUEST_LATENCY.labels(labels).startTimer(); try { T result = callWithCircuitBreaker0(funcName, func); - REQUEST_SUCCESS.labels(funcName).inc(); + REQUEST_SUCCESS.labels(labels).inc(); return result; } catch (Exception e) { - REQUEST_FAILURE.labels(funcName).inc(); + REQUEST_FAILURE.labels(labels).inc(); throw e; } finally { requestTimer.observeDuration(); @@ -244,7 +253,9 @@ private T callWithCircuitBreaker0(String funcName, Function1 func) { } } else { logger.debug("Circuit Breaker Opened"); - CIRCUIT_BREAKER_OPENED.labels(funcName).inc(); + CIRCUIT_BREAKER_OPENED + .labels(funcName, client.getSession().getPDClient().getClusterId().toString()) + .inc(); throw new CircuitBreakerOpenException(); } } diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java index 16b2bd76cfa..bf004069287 100644 --- a/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java @@ -32,9 +32,10 @@ public class CircuitBreakerImpl implements CircuitBreaker { Counter.build() .name("client_java_circuit_breaker_attempt_counter") .help("client circuit breaker attempt counter.") - .labelNames("type") + .labelNames("type", "cluster") .register(); + private final Long clusterId; private final boolean enable; private final int windowInSeconds; private final int errorThresholdPercentage; @@ -49,14 +50,15 @@ public class CircuitBreakerImpl implements CircuitBreaker { private final CircuitBreakerMetrics metrics; - public CircuitBreakerImpl(TiConfiguration conf) { + public CircuitBreakerImpl(TiConfiguration conf, long clusterId) { this( conf.isCircuitBreakEnable(), conf.getCircuitBreakAvailabilityWindowInSeconds(), conf.getCircuitBreakAvailabilityErrorThresholdPercentage(), conf.getCircuitBreakAvailabilityRequestVolumnThreshold(), conf.getCircuitBreakSleepWindowInSeconds(), - conf.getCircuitBreakAttemptRequestCount()); + conf.getCircuitBreakAttemptRequestCount(), + clusterId); } public CircuitBreakerImpl( @@ -65,8 +67,10 @@ public CircuitBreakerImpl( int errorThresholdPercentage, int requestVolumeThreshold, int sleepWindowInSeconds, - int attemptRequestCount) { + int attemptRequestCount, + long clusterId) { this.enable = enable; + this.clusterId = clusterId; this.windowInSeconds = windowInSeconds; this.errorThresholdPercentage = errorThresholdPercentage; this.requestVolumeThreshold = requestVolumeThreshold; @@ -125,7 +129,7 @@ Status getStatus() { @Override public void recordAttemptSuccess() { - CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("success").inc(); + CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("success", clusterId.toString()).inc(); if (attemptSuccessCount.incrementAndGet() >= this.attemptRequestCount) { halfOpen2Close(); } @@ -133,7 +137,7 @@ public void recordAttemptSuccess() { @Override public void recordAttemptFailure() { - CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("failure").inc(); + CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("failure", clusterId.toString()).inc(); halfOpen2Open(); } diff --git a/src/main/java/org/tikv/txn/KVClient.java b/src/main/java/org/tikv/txn/KVClient.java index 2e87dd33c9b..dfa9b8b2962 100644 --- a/src/main/java/org/tikv/txn/KVClient.java +++ b/src/main/java/org/tikv/txn/KVClient.java @@ -77,7 +77,9 @@ public void close() { * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist */ public ByteString get(ByteString key, long version) throws GrpcException { - BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); + BackOffer backOffer = + ConcreteBackOffer.newGetBackOff( + clientBuilder.getRegionManager().getPDClient().getClusterId()); while (true) { RegionStoreClient client = clientBuilder.build(key); try { @@ -178,7 +180,9 @@ public synchronized void ingest(List> list) throws List keyList = list.stream().map(pair -> pair.first).collect(Collectors.toList()); Map> groupKeys = groupKeysByRegion( - clientBuilder.getRegionManager(), keyList, ConcreteBackOffer.newRawKVBackOff()); + clientBuilder.getRegionManager(), + keyList, + ConcreteBackOffer.newRawKVBackOff(tiSession.getPDClient().getClusterId())); // ingest for each region for (Map.Entry> entry : groupKeys.entrySet()) { diff --git a/src/main/java/org/tikv/txn/TTLManager.java b/src/main/java/org/tikv/txn/TTLManager.java index 4168fc28682..1d9ffb7ff8f 100644 --- a/src/main/java/org/tikv/txn/TTLManager.java +++ b/src/main/java/org/tikv/txn/TTLManager.java @@ -92,7 +92,9 @@ public void keepAlive() { } private void doKeepAlive() { - BackOffer bo = ConcreteBackOffer.newCustomBackOff(MANAGED_LOCK_TTL); + BackOffer bo = + ConcreteBackOffer.newCustomBackOff( + MANAGED_LOCK_TTL, regionManager.getPDClient().getClusterId()); long uptime = kvClient.getTimestamp().getPhysical() - TiTimestamp.extractPhysical(startTS); long ttl = uptime + MANAGED_LOCK_TTL; diff --git a/src/main/java/org/tikv/txn/TwoPhaseCommitter.java b/src/main/java/org/tikv/txn/TwoPhaseCommitter.java index 56e1835b265..550b7de47ac 100644 --- a/src/main/java/org/tikv/txn/TwoPhaseCommitter.java +++ b/src/main/java/org/tikv/txn/TwoPhaseCommitter.java @@ -284,7 +284,9 @@ private void doPrewriteSecondaryKeys( // consume one task if reaches task limit completionService.take().get(); } - BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(maxBackOfferMS); + BackOffer backOffer = + ConcreteBackOffer.newCustomBackOff( + maxBackOfferMS, regionManager.getPDClient().getClusterId()); completionService.submit( () -> { doPrewriteSecondaryKeysInBatchesWithRetry( @@ -541,7 +543,9 @@ private void doCommitSecondaryKeys( // consume one task if reaches task limit completionService.take().get(); } - BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(commitBackOfferMS); + BackOffer backOffer = + ConcreteBackOffer.newCustomBackOff( + commitBackOfferMS, regionManager.getPDClient().getClusterId()); completionService.submit( () -> { doCommitSecondaryKeysWithRetry(backOffer, keyBytes, curSize, commitTs); diff --git a/src/main/java/org/tikv/txn/TxnKVClient.java b/src/main/java/org/tikv/txn/TxnKVClient.java index 3dae8c85abb..7806c56496e 100644 --- a/src/main/java/org/tikv/txn/TxnKVClient.java +++ b/src/main/java/org/tikv/txn/TxnKVClient.java @@ -70,7 +70,7 @@ public RegionManager getRegionManager() { } public TiTimestamp getTimestamp() { - BackOffer bo = ConcreteBackOffer.newTsoBackOff(); + BackOffer bo = ConcreteBackOffer.newTsoBackOff(pdClient.getClusterId()); TiTimestamp timestamp = new TiTimestamp(0, 0); try { while (true) { diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index d4ad7bdbe06..98e4c0d620a 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -201,7 +201,8 @@ public void testCustomBackOff() { public void testDeadlineBackOff() { int timeout = 2000; int sleep = 150; - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(timeout, SlowLogEmptyImpl.INSTANCE); + BackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(timeout, SlowLogEmptyImpl.INSTANCE, 0); long s = System.currentTimeMillis(); try { while (true) { diff --git a/src/test/java/org/tikv/service/failsafe/CircuitBreakerTest.java b/src/test/java/org/tikv/service/failsafe/CircuitBreakerTest.java index 080d234aa14..914db625615 100644 --- a/src/test/java/org/tikv/service/failsafe/CircuitBreakerTest.java +++ b/src/test/java/org/tikv/service/failsafe/CircuitBreakerTest.java @@ -42,7 +42,8 @@ public void testCircuitBreaker() throws InterruptedException { errorThresholdPercentage, requestVolumeThreshold, sleepWindowInSeconds, - attemptRequestCount); + attemptRequestCount, + 1024); CircuitBreakerMetrics metrics = circuitBreaker.getMetrics(); // initial state: CLOSE