Skip to content

Commit

Permalink
Merge branch 'master' into fix-typo-in-github-template
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Mar 28, 2022
2 parents 867ac60 + 6cbf56a commit a12a6f6
Show file tree
Hide file tree
Showing 31 changed files with 470 additions and 237 deletions.
103 changes: 67 additions & 36 deletions metrics/grafana/client_java_summary.json

Large diffs are not rendered by default.

90 changes: 42 additions & 48 deletions src/main/java/org/tikv/common/AbstractGRPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,16 @@ public <ReqT, RespT> RespT callWithRetry(
if (logger.isTraceEnabled()) {
logger.trace(String.format("Calling %s...", method.getFullMethodName()));
}
RetryPolicy.Builder<RespT> builder = new Builder<>(backOffer);
RetryPolicy<RespT> policy = new Builder<RespT>(backOffer).create(handler);
RespT resp =
builder
.create(handler)
.callWithRetry(
() -> {
BlockingStubT stub = getBlockingStub();
return ClientCalls.blockingUnaryCall(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get());
},
method.getFullMethodName(),
backOffer);
policy.callWithRetry(
() -> {
BlockingStubT stub = getBlockingStub();
return ClientCalls.blockingUnaryCall(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get());
},
method.getFullMethodName(),
backOffer);

if (logger.isTraceEnabled()) {
logger.trace(String.format("leaving %s...", method.getFullMethodName()));
Expand All @@ -109,20 +107,18 @@ protected <ReqT, RespT> void callAsyncWithRetry(
ErrorHandler<RespT> handler) {
logger.debug(String.format("Calling %s...", method.getFullMethodName()));

RetryPolicy.Builder<RespT> builder = new Builder<>(backOffer);
builder
.create(handler)
.callWithRetry(
() -> {
FutureStubT stub = getAsyncStub();
ClientCalls.asyncUnaryCall(
stub.getChannel().newCall(method, stub.getCallOptions()),
requestFactory.get(),
responseObserver);
return null;
},
method.getFullMethodName(),
backOffer);
RetryPolicy<RespT> policy = new Builder<RespT>(backOffer).create(handler);
policy.callWithRetry(
() -> {
FutureStubT stub = getAsyncStub();
ClientCalls.asyncUnaryCall(
stub.getChannel().newCall(method, stub.getCallOptions()),
requestFactory.get(),
responseObserver);
return null;
},
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
}

Expand All @@ -133,18 +129,17 @@ <ReqT, RespT> StreamObserver<ReqT> callBidiStreamingWithRetry(
ErrorHandler<StreamObserver<ReqT>> handler) {
logger.debug(String.format("Calling %s...", method.getFullMethodName()));

RetryPolicy.Builder<StreamObserver<ReqT>> builder = new Builder<>(backOffer);
RetryPolicy<StreamObserver<ReqT>> policy =
new Builder<StreamObserver<ReqT>>(backOffer).create(handler);
StreamObserver<ReqT> observer =
builder
.create(handler)
.callWithRetry(
() -> {
FutureStubT stub = getAsyncStub();
return asyncBidiStreamingCall(
stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver);
},
method.getFullMethodName(),
backOffer);
policy.callWithRetry(
() -> {
FutureStubT stub = getAsyncStub();
return asyncBidiStreamingCall(
stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver);
},
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
return observer;
}
Expand All @@ -156,19 +151,18 @@ public <ReqT, RespT> StreamingResponse callServerStreamingWithRetry(
ErrorHandler<StreamingResponse> handler) {
logger.debug(String.format("Calling %s...", method.getFullMethodName()));

RetryPolicy.Builder<StreamingResponse> builder = new Builder<>(backOffer);
RetryPolicy<StreamingResponse> policy =
new Builder<StreamingResponse>(backOffer).create(handler);
StreamingResponse response =
builder
.create(handler)
.callWithRetry(
() -> {
BlockingStubT stub = getBlockingStub();
return new StreamingResponse(
blockingServerStreamingCall(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()));
},
method.getFullMethodName(),
backOffer);
policy.callWithRetry(
() -> {
BlockingStubT stub = getBlockingStub();
return new StreamingResponse(
blockingServerStreamingCall(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()));
},
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
return response;
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/tikv/common/KVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public void close() {}
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
*/
public ByteString get(ByteString key, long version) throws GrpcException {
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
BackOffer backOffer =
ConcreteBackOffer.newGetBackOff(
clientBuilder.getRegionManager().getPDClient().getClusterId());
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
HistogramUtils.buildDuration()
.name("client_java_pd_get_region_by_requests_latency")
.help("pd getRegionByKey request latency.")
.labelNames("cluster")
.register();

private PDClient(TiConfiguration conf, ChannelFactory channelFactory) {
Expand Down Expand Up @@ -281,7 +282,7 @@ private GetOperatorResponse getOperator(long regionId) {
() -> GetOperatorRequest.newBuilder().setHeader(header).setRegionId(regionId).build();
// get operator no need to handle error and no need back offer.
return callWithRetry(
ConcreteBackOffer.newCustomBackOff(0),
ConcreteBackOffer.newCustomBackOff(0, getClusterId()),
PDGrpc.getGetOperatorMethod(),
request,
new NoopHandler<>());
Expand Down Expand Up @@ -309,7 +310,8 @@ private boolean isScatterRegionFinish(GetOperatorResponse resp) {

@Override
public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
Histogram.Timer requestTimer =
PD_GET_REGION_BY_KEY_REQUEST_LATENCY.labels(getClusterId().toString()).startTimer();
try {
if (conf.isTxnKVMode()) {
CodecDataOutput cdo = new CodecDataOutput();
Expand Down Expand Up @@ -841,7 +843,7 @@ private Metapb.Region decodeRegion(Metapb.Region region) {
return builder.build();
}

public long getClusterId() {
public Long getClusterId() {
return header.getClusterId();
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/tikv/common/ReadOnlyPDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,6 @@ List<Pdpb.Region> scanRegions(
List<Store> getAllStores(BackOffer backOffer);

TiConfiguration.ReplicaRead getReplicaRead();

Long getClusterId();
}
4 changes: 3 additions & 1 deletion src/main/java/org/tikv/common/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ public List<org.tikv.common.BytePairWrapper> batchGet(int backOffer, List<byte[]
try (KVClient client = new KVClient(session, session.getRegionStoreClientBuilder())) {
List<KvPair> kvPairList =
client.batchGet(
ConcreteBackOffer.newCustomBackOff(backOffer), list, timestamp.getVersion());
ConcreteBackOffer.newCustomBackOff(backOffer, session.getPDClient().getClusterId()),
list,
timestamp.getVersion());
return kvPairList
.stream()
.map(
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/tikv/common/StoreVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public static int compareTo(String v0, String v1) {
public static boolean minTiKVVersion(String version, PDClient pdClient) {
StoreVersion storeVersion = new StoreVersion(version);

BackOffer bo = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
BackOffer bo =
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF, pdClient.getClusterId());
List<Metapb.Store> storeList =
pdClient
.getAllStores(bo)
Expand Down
18 changes: 12 additions & 6 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public TiSession(TiConfiguration conf) {
if (conf.isWarmUpEnable() && conf.isRawKVMode()) {
warmUp();
}
this.circuitBreaker = new CircuitBreakerImpl(conf);
this.circuitBreaker = new CircuitBreakerImpl(conf, client.getClusterId());
logger.info("TiSession initialized in " + conf.getKvMode() + " mode");
}

Expand All @@ -179,7 +179,7 @@ private static VersionInfo getVersionInfo() {

private synchronized void warmUp() {
long warmUpStartTime = System.nanoTime();
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff();
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(getPDClient().getClusterId());
try {
// let JVM ClassLoader load gRPC error related classes
// this operation may cost 100ms
Expand Down Expand Up @@ -329,7 +329,8 @@ public TiConfiguration getConf() {
public TiTimestamp getTimestamp() {
checkIsClosed();

return getPDClient().getTimestamp(ConcreteBackOffer.newTsoBackOff());
return getPDClient()
.getTimestamp(ConcreteBackOffer.newTsoBackOff(getPDClient().getClusterId()));
}

public Snapshot createSnapshot() {
Expand Down Expand Up @@ -586,13 +587,16 @@ public void splitRegionAndScatter(
.stream()
.map(k -> Key.toRawKey(k).toByteString())
.collect(Collectors.toList()),
ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS));
ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS, getPDClient().getClusterId()));

// scatter region
for (Metapb.Region newRegion : newRegions) {
try {
getPDClient()
.scatterRegion(newRegion, ConcreteBackOffer.newCustomBackOff(scatterRegionBackoffMS));
.scatterRegion(
newRegion,
ConcreteBackOffer.newCustomBackOff(
scatterRegionBackoffMS, getPDClient().getClusterId()));
} catch (Exception e) {
logger.warn(String.format("failed to scatter region: %d", newRegion.getId()), e);
}
Expand All @@ -609,7 +613,9 @@ public void splitRegionAndScatter(
return;
}
getPDClient()
.waitScatterRegionFinish(newRegion, ConcreteBackOffer.newCustomBackOff((int) remainMS));
.waitScatterRegionFinish(
newRegion,
ConcreteBackOffer.newCustomBackOff((int) remainMS, getPDClient().getClusterId()));
}
} else {
logger.info("skip to wait scatter region finish");
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/tikv/common/importer/ImporterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,9 @@ private void ingest() throws GrpcException {
}

Object writeResponse = clientLeader.getWriteResponse();
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.INGEST_BACKOFF);
BackOffer backOffer =
ConcreteBackOffer.newCustomBackOff(
BackOffer.INGEST_BACKOFF, tiSession.getPDClient().getClusterId());
ingestWithRetry(writeResponse, backOffer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private void switchTiKVToImportMode() {
}

private void doSwitchTiKVMode(ImportSstpb.SwitchMode mode) {
BackOffer bo = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
BackOffer bo =
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF, pdClient.getClusterId());
List<Metapb.Store> allStores = pdClient.getAllStores(bo);
for (Metapb.Store store : allStores) {
ImporterStoreClient client = builder.build(new TiStore(store));
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,7 @@ default SlowLog withField(String key, Object value) {
return withFields(ImmutableMap.of(key, value));
}

Object getField(String key);

void log();
}
5 changes: 5 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public SlowLog withFields(Map<String, Object> fields) {
return this;
}

@Override
public Object getField(String key) {
return null;
}

@Override
public void log() {}
}
5 changes: 5 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLogImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public SlowLog withFields(Map<String, Object> fields) {
return this;
}

@Override
public Object getField(String key) {
return fields.get(key);
}

@Override
public void log() {
recordTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ private ByteString resolveCurrentLock(Kvrpcpb.KvPair current) {
builder.getRegionManager().getRegionStorePairByKey(current.getKey());
TiRegion region = pair.first;
TiStore store = pair.second;
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
BackOffer backOffer =
ConcreteBackOffer.newGetBackOff(builder.getRegionManager().getPDClient().getClusterId());
try (RegionStoreClient client = builder.build(region, store)) {
return client.get(backOffer, current.getKey(), version);
} catch (Exception e) {
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/tikv/common/policy/RetryPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ public abstract class RetryPolicy<RespT> {
HistogramUtils.buildDuration()
.name("client_java_grpc_single_requests_latency")
.help("grpc request latency.")
.labelNames("type")
.labelNames("type", "cluster")
.register();
public static final Histogram CALL_WITH_RETRY_DURATION =
HistogramUtils.buildDuration()
.name("client_java_call_with_retry_duration")
.help("callWithRetry duration.")
.labelNames("type")
.labelNames("type", "cluster")
.register();
public static final Counter GRPC_REQUEST_RETRY_NUM =
Counter.build()
.name("client_java_grpc_requests_retry_num")
.help("grpc request retry num.")
.labelNames("type")
.labelNames("type", "cluster")
.register();

// handles PD and TiKV's error.
Expand All @@ -72,16 +72,16 @@ private void rethrowNotRecoverableException(Exception e) {
}

public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer backOffer) {
Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(methodName).startTimer();
String[] labels = new String[] {methodName, backOffer.getClusterId().toString()};
Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(labels).startTimer();
SlowLogSpan callWithRetrySlowLogSpan = backOffer.getSlowLog().start("callWithRetry");
callWithRetrySlowLogSpan.addProperty("method", methodName);
try {
while (true) {
RespT result = null;
try {
// add single request duration histogram
Histogram.Timer requestTimer =
GRPC_SINGLE_REQUEST_LATENCY.labels(methodName).startTimer();
Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.labels(labels).startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("gRPC");
slowLogSpan.addProperty("method", methodName);
try {
Expand All @@ -96,7 +96,7 @@ public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer ba
backOffer.checkTimeout();
boolean retry = handler.handleRequestError(backOffer, e);
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
GRPC_REQUEST_RETRY_NUM.labels(labels).inc();
continue;
} else {
return result;
Expand All @@ -107,7 +107,7 @@ public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer ba
if (handler != null) {
boolean retry = handler.handleResponseError(backOffer, result);
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
GRPC_REQUEST_RETRY_NUM.labels(labels).inc();
continue;
}
}
Expand Down
Loading

0 comments on commit a12a6f6

Please sign in to comment.