From 6f36b29591cb38a8174d8e18032a7a2a5a979009 Mon Sep 17 00:00:00 2001 From: Liangliang Gu Date: Thu, 20 Jan 2022 13:38:54 +0800 Subject: [PATCH 1/2] cherry pick #484 to release-3.1 Signed-off-by: ti-srebot --- .../java/io/grpc/internal/ClientCallImpl.java | 51 +++++++ .../io/grpc/netty/NettyClientHandler.java | 143 ++++++++++++------ .../java/io/grpc/netty/NettyClientStream.java | 22 +++ src/main/java/io/grpc/netty/WriteQueue.java | 98 +++++++++++- src/main/java/io/grpc/stub/ClientCalls.java | 30 ++++ .../channel/socket/nio/NioSocketChannel.java | 60 +++++++- .../codec/http2/DefaultHttp2FrameWriter.java | 13 +- .../DefaultHttp2RemoteFlowController.java | 15 +- .../codec/http2/Http2ConnectionHandler.java | 19 +++ .../codec/http2/Http2OutboundFrameLogger.java | 12 ++ src/main/java/org/tikv/common/PDClient.java | 3 +- .../org/tikv/common/policy/RetryPolicy.java | 5 +- .../region/AbstractRegionStoreClient.java | 5 +- .../org/tikv/common/region/RegionManager.java | 5 +- .../tikv/common/region/RegionStoreClient.java | 2 +- .../tikv/common/util/ConcreteBackOffer.java | 2 +- .../org/tikv/common/util/HistogramUtils.java | 50 ++++++ src/main/java/org/tikv/raw/RawKVClient.java | 2 +- .../java/org/tikv/raw/SmartRawKVClient.java | 7 +- 19 files changed, 476 insertions(+), 68 deletions(-) create mode 100644 src/main/java/org/tikv/common/util/HistogramUtils.java diff --git a/src/main/java/io/grpc/internal/ClientCallImpl.java b/src/main/java/io/grpc/internal/ClientCallImpl.java index 381681dd599..5b8d2f6ba25 100644 --- a/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -50,6 +50,7 @@ import io.perfmark.Link; import io.perfmark.PerfMark; import io.perfmark.Tag; +import io.prometheus.client.Histogram; import java.io.InputStream; import java.nio.charset.Charset; import java.util.Locale; @@ -61,10 +62,18 @@ import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; +import org.tikv.common.util.HistogramUtils; /** Implementation of {@link ClientCall}. */ final class ClientCallImpl extends ClientCall { + public static final Histogram perfmarkClientCallImplDuration = + HistogramUtils.buildDuration() + .name("perfmark_client_call_impl_duration_seconds") + .help("Perfmark client call impl duration seconds") + .labelNames("type") + .register(); + private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName()); private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS = "gzip".getBytes(Charset.forName("US-ASCII")); @@ -179,10 +188,12 @@ static void prepareHeaders( @Override public void start(Listener observer, Metadata headers) { PerfMark.startTask("ClientCall.start", tag); + Histogram.Timer start = perfmarkClientCallImplDuration.labels("ClientCall.start").startTimer(); try { startInternal(observer, headers); } finally { PerfMark.stopTask("ClientCall.start", tag); + start.observeDuration(); } } @@ -428,22 +439,28 @@ private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline dea @Override public void request(int numMessages) { PerfMark.startTask("ClientCall.request", tag); + Histogram.Timer request = + perfmarkClientCallImplDuration.labels("ClientCall.request").startTimer(); try { checkState(stream != null, "Not started"); checkArgument(numMessages >= 0, "Number requested must be non-negative"); stream.request(numMessages); } finally { PerfMark.stopTask("ClientCall.request", tag); + request.observeDuration(); } } @Override public void cancel(@Nullable String message, @Nullable Throwable cause) { PerfMark.startTask("ClientCall.cancel", tag); + Histogram.Timer cancel = + perfmarkClientCallImplDuration.labels("ClientCall.cancel").startTimer(); try { cancelInternal(message, cause); } finally { PerfMark.stopTask("ClientCall.cancel", tag); + cancel.observeDuration(); } } @@ -479,10 +496,13 @@ private void cancelInternal(@Nullable String message, @Nullable Throwable cause) @Override public void halfClose() { PerfMark.startTask("ClientCall.halfClose", tag); + Histogram.Timer halfClose = + perfmarkClientCallImplDuration.labels("ClientCall.halfClose").startTimer(); try { halfCloseInternal(); } finally { PerfMark.stopTask("ClientCall.halfClose", tag); + halfClose.observeDuration(); } } @@ -497,10 +517,13 @@ private void halfCloseInternal() { @Override public void sendMessage(ReqT message) { PerfMark.startTask("ClientCall.sendMessage", tag); + Histogram.Timer sendMessage = + perfmarkClientCallImplDuration.labels("ClientCall.sendMessage").startTimer(); try { sendMessageInternal(message); } finally { PerfMark.stopTask("ClientCall.sendMessage", tag); + sendMessage.observeDuration(); } } @@ -582,6 +605,8 @@ private void exceptionThrown(Status status) { @Override public void headersRead(final Metadata headers) { PerfMark.startTask("ClientStreamListener.headersRead", tag); + Histogram.Timer headersRead = + perfmarkClientCallImplDuration.labels("ClientStreamListener.headersRead").startTimer(); final Link link = PerfMark.linkOut(); final class HeadersRead extends ContextRunnable { @@ -592,11 +617,14 @@ final class HeadersRead extends ContextRunnable { @Override public void runInContext() { PerfMark.startTask("ClientCall$Listener.headersRead", tag); + Histogram.Timer headersRead = + perfmarkClientCallImplDuration.labels("ClientCall$Listener.headersRead").startTimer(); PerfMark.linkIn(link); try { runInternal(); } finally { PerfMark.stopTask("ClientCall$Listener.headersRead", tag); + headersRead.observeDuration(); } } @@ -617,12 +645,17 @@ private void runInternal() { callExecutor.execute(new HeadersRead()); } finally { PerfMark.stopTask("ClientStreamListener.headersRead", tag); + headersRead.observeDuration(); } } @Override public void messagesAvailable(final MessageProducer producer) { PerfMark.startTask("ClientStreamListener.messagesAvailable", tag); + Histogram.Timer messagesAvailable = + perfmarkClientCallImplDuration + .labels("ClientStreamListener.messagesAvailable") + .startTimer(); final Link link = PerfMark.linkOut(); final class MessagesAvailable extends ContextRunnable { @@ -633,11 +666,16 @@ final class MessagesAvailable extends ContextRunnable { @Override public void runInContext() { PerfMark.startTask("ClientCall$Listener.messagesAvailable", tag); + Histogram.Timer messagesAvailable = + perfmarkClientCallImplDuration + .labels("ClientCall$Listener.messagesAvailable") + .startTimer(); PerfMark.linkIn(link); try { runInternal(); } finally { PerfMark.stopTask("ClientCall$Listener.messagesAvailable", tag); + messagesAvailable.observeDuration(); } } @@ -669,6 +707,7 @@ private void runInternal() { callExecutor.execute(new MessagesAvailable()); } finally { PerfMark.stopTask("ClientStreamListener.messagesAvailable", tag); + messagesAvailable.observeDuration(); } } @@ -680,10 +719,13 @@ public void closed(Status status, Metadata trailers) { @Override public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { PerfMark.startTask("ClientStreamListener.closed", tag); + Histogram.Timer closed = + perfmarkClientCallImplDuration.labels("ClientStreamListener.closed").startTimer(); try { closedInternal(status, rpcProgress, trailers); } finally { PerfMark.stopTask("ClientStreamListener.closed", tag); + closed.observeDuration(); } } @@ -715,11 +757,14 @@ final class StreamClosed extends ContextRunnable { @Override public void runInContext() { PerfMark.startTask("ClientCall$Listener.onClose", tag); + Histogram.Timer onClose = + perfmarkClientCallImplDuration.labels("ClientCall$Listener.onClose").startTimer(); PerfMark.linkIn(link); try { runInternal(); } finally { PerfMark.stopTask("ClientCall$Listener.onClose", tag); + onClose.observeDuration(); } } @@ -756,6 +801,8 @@ public void onReady() { } PerfMark.startTask("ClientStreamListener.onReady", tag); + Histogram.Timer onReady = + perfmarkClientCallImplDuration.labels("ClientStreamListener.onReady").startTimer(); final Link link = PerfMark.linkOut(); final class StreamOnReady extends ContextRunnable { @@ -766,11 +813,14 @@ final class StreamOnReady extends ContextRunnable { @Override public void runInContext() { PerfMark.startTask("ClientCall$Listener.onReady", tag); + Histogram.Timer onReady = + perfmarkClientCallImplDuration.labels("ClientCall$Listener.onReady").startTimer(); PerfMark.linkIn(link); try { runInternal(); } finally { PerfMark.stopTask("ClientCall$Listener.onReady", tag); + onReady.observeDuration(); } } @@ -791,6 +841,7 @@ private void runInternal() { callExecutor.execute(new StreamOnReady()); } finally { PerfMark.stopTask("ClientStreamListener.onReady", tag); + onReady.observeDuration(); } } } diff --git a/src/main/java/io/grpc/netty/NettyClientHandler.java b/src/main/java/io/grpc/netty/NettyClientHandler.java index 12fcfb6b9e6..2248cee33ee 100644 --- a/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -79,11 +79,13 @@ import io.netty.handler.logging.LogLevel; import io.perfmark.PerfMark; import io.perfmark.Tag; +import io.prometheus.client.Histogram; import java.nio.channels.ClosedChannelException; import java.util.concurrent.Executor; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; +import org.tikv.common.util.HistogramUtils; /** * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within @@ -132,6 +134,31 @@ protected void handleNotInUse() { private Status abruptGoAwayStatus; private Status channelInactiveReason; + public static final Histogram createStreamWriteHeaderDuration = + HistogramUtils.buildDuration() + .name("grpc_netty_client_stream_write_header_duration_seconds") + .help("Time taken to write headers for a stream in seconds.") + .register(); + + public static final Histogram createStreamAddListenerDuration = + HistogramUtils.buildDuration() + .name("grpc_netty_client_stream_add_listener_duration_seconds") + .help("Time taken to add listener for a stream future in seconds.") + .register(); + + public static final Histogram createStreamCreateNewFuture = + HistogramUtils.buildDuration() + .name("grpc_netty_client_stream_create_future_duration_seconds") + .help("Time taken to create new stream future in seconds.") + .register(); + + public static final Histogram perfmarkNettyClientHandlerDuration = + HistogramUtils.buildDuration() + .name("perfmark_netty_client_handler_duration_seconds") + .help("Perfmark netty client handler duration seconds") + .labelNames("type") + .register(); + static NettyClientHandler newHandler( ClientTransportLifecycleManager lifecycleManager, @Nullable KeepAliveManager keepAliveManager, @@ -608,12 +635,15 @@ private void createStream(CreateStreamCommand command, ChannelPromise promise) t stream.setId(streamId); PerfMark.startTask("NettyClientHandler.createStream", stream.tag()); + Histogram.Timer createStream = + perfmarkNettyClientHandlerDuration.labels("NettyClientHandler.createStream").startTimer(); PerfMark.linkIn(command.getLink()); try { createStreamTraced( streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise); } finally { PerfMark.stopTask("NettyClientHandler.createStream", stream.tag()); + createStream.observeDuration(); } } @@ -626,56 +656,63 @@ private void createStreamTraced( final ChannelPromise promise) { // Create an intermediate promise so that we can intercept the failure reported back to the // application. + Histogram.Timer createFutureTimer = createStreamCreateNewFuture.startTimer(); ChannelPromise tempPromise = ctx().newPromise(); - encoder() - .writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise) - .addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - // The http2Stream will be null in case a stream buffered in the encoder - // was canceled via RST_STREAM. - Http2Stream http2Stream = connection().stream(streamId); - if (http2Stream != null) { - stream.getStatsTraceContext().clientOutboundHeaders(); - http2Stream.setProperty(streamKey, stream); - - // This delays the in-use state until the I/O completes, which technically may - // be later than we would like. - if (shouldBeCountedForInUse) { - inUseState.updateObjectInUse(http2Stream, true); - } + createFutureTimer.observeDuration(); - // Attach the client stream to the HTTP/2 stream object as user data. - stream.setHttp2Stream(http2Stream); - } - // Otherwise, the stream has been cancelled and Netty is sending a - // RST_STREAM frame which causes it to purge pending writes from the - // flow-controller and delete the http2Stream. The stream listener has already - // been notified of cancellation so there is nothing to do. - - // Just forward on the success status to the original promise. - promise.setSuccess(); - } else { - final Throwable cause = future.cause(); - if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) { - StreamBufferingEncoder.Http2GoAwayException e = - (StreamBufferingEncoder.Http2GoAwayException) cause; - Status status = - statusFromH2Error( - Status.Code.UNAVAILABLE, - "GOAWAY closed buffered stream", - e.errorCode(), - e.debugData()); - stream.transportReportStatus(status, RpcProgress.REFUSED, true, new Metadata()); - promise.setFailure(status.asRuntimeException()); - } else { - promise.setFailure(cause); - } + Histogram.Timer writeHeaderTimer = createStreamWriteHeaderDuration.startTimer(); + ChannelFuture future = encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise); + writeHeaderTimer.observeDuration(); + + Histogram.Timer addListenerTimer = createStreamAddListenerDuration.startTimer(); + future.addListener( + new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + // The http2Stream will be null in case a stream buffered in the encoder + // was canceled via RST_STREAM. + Http2Stream http2Stream = connection().stream(streamId); + if (http2Stream != null) { + stream.getStatsTraceContext().clientOutboundHeaders(); + http2Stream.setProperty(streamKey, stream); + + // This delays the in-use state until the I/O completes, which technically may + // be later than we would like. + if (shouldBeCountedForInUse) { + inUseState.updateObjectInUse(http2Stream, true); } + + // Attach the client stream to the HTTP/2 stream object as user data. + stream.setHttp2Stream(http2Stream); } - }); + // Otherwise, the stream has been cancelled and Netty is sending a + // RST_STREAM frame which causes it to purge pending writes from the + // flow-controller and delete the http2Stream. The stream listener has already + // been notified of cancellation so there is nothing to do. + + // Just forward on the success status to the original promise. + promise.setSuccess(); + } else { + final Throwable cause = future.cause(); + if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) { + StreamBufferingEncoder.Http2GoAwayException e = + (StreamBufferingEncoder.Http2GoAwayException) cause; + Status status = + statusFromH2Error( + Status.Code.UNAVAILABLE, + "GOAWAY closed buffered stream", + e.errorCode(), + e.debugData()); + stream.transportReportStatus(status, RpcProgress.REFUSED, true, new Metadata()); + promise.setFailure(status.asRuntimeException()); + } else { + promise.setFailure(cause); + } + } + } + }); + addListenerTimer.observeDuration(); } /** Cancels this stream. */ @@ -683,6 +720,8 @@ private void cancelStream( ChannelHandlerContext ctx, CancelClientStreamCommand cmd, ChannelPromise promise) { NettyClientStream.TransportState stream = cmd.stream(); PerfMark.startTask("NettyClientHandler.cancelStream", stream.tag()); + Histogram.Timer cancelStream = + perfmarkNettyClientHandlerDuration.labels("NettyClientHandler.cancelStream").startTimer(); PerfMark.linkIn(cmd.getLink()); try { Status reason = cmd.reason(); @@ -696,6 +735,7 @@ private void cancelStream( } } finally { PerfMark.stopTask("NettyClientHandler.cancelStream", stream.tag()); + cancelStream.observeDuration(); } } @@ -703,6 +743,8 @@ private void cancelStream( private void sendGrpcFrame( ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise) { PerfMark.startTask("NettyClientHandler.sendGrpcFrame", cmd.stream().tag()); + Histogram.Timer sendGrpcFrame = + perfmarkNettyClientHandlerDuration.labels("NettyClientHandler.sendGrpcFrame").startTimer(); PerfMark.linkIn(cmd.getLink()); try { // Call the base class to write the HTTP/2 DATA frame. @@ -710,17 +752,21 @@ private void sendGrpcFrame( encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise); } finally { PerfMark.stopTask("NettyClientHandler.sendGrpcFrame", cmd.stream().tag()); + sendGrpcFrame.observeDuration(); } } private void sendPingFrame( ChannelHandlerContext ctx, SendPingCommand msg, ChannelPromise promise) { PerfMark.startTask("NettyClientHandler.sendPingFrame"); + Histogram.Timer sendPingFrame = + perfmarkNettyClientHandlerDuration.labels("NettyClientHandler.sendPingFrame").startTimer(); PerfMark.linkIn(msg.getLink()); try { sendPingFrameTraced(ctx, msg, promise); } finally { PerfMark.stopTask("NettyClientHandler.sendPingFrame"); + sendPingFrame.observeDuration(); } } @@ -807,6 +853,10 @@ public boolean visit(Http2Stream stream) throws Http2Exception { NettyClientStream.TransportState clientStream = clientStream(stream); Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag(); PerfMark.startTask("NettyClientHandler.forcefulClose", tag); + Histogram.Timer forcefulClose = + perfmarkNettyClientHandlerDuration + .labels("NettyClientHandler.forcefulClose") + .startTimer(); PerfMark.linkIn(msg.getLink()); try { if (clientStream != null) { @@ -817,6 +867,7 @@ public boolean visit(Http2Stream stream) throws Http2Exception { return true; } finally { PerfMark.stopTask("NettyClientHandler.forcefulClose", tag); + forcefulClose.observeDuration(); } } }); diff --git a/src/main/java/io/grpc/netty/NettyClientStream.java b/src/main/java/io/grpc/netty/NettyClientStream.java index cd94d2524d5..e211274599f 100644 --- a/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/src/main/java/io/grpc/netty/NettyClientStream.java @@ -45,7 +45,9 @@ import io.netty.util.AsciiString; import io.perfmark.PerfMark; import io.perfmark.Tag; +import io.prometheus.client.Histogram; import javax.annotation.Nullable; +import org.tikv.common.util.HistogramUtils; /** Client stream for a Netty transport. Must only be called from the sending application thread. */ class NettyClientStream extends AbstractClientStream { @@ -63,6 +65,13 @@ class NettyClientStream extends AbstractClientStream { private final AsciiString scheme; private final AsciiString userAgent; + public static final Histogram perfmarkNettyClientStreamDuration = + HistogramUtils.buildDuration() + .name("perfmark_netty_client_stream_duration_seconds") + .help("Perfmark netty client stream duration seconds") + .labelNames("type") + .register(); + NettyClientStream( TransportState state, MethodDescriptor method, @@ -115,10 +124,15 @@ private class Sink implements AbstractClientStream.Sink { @Override public void writeHeaders(Metadata headers, byte[] requestPayload) { PerfMark.startTask("NettyClientStream$Sink.writeHeaders"); + Histogram.Timer writeHeaders = + perfmarkNettyClientStreamDuration + .labels("NettyClientStream$Sink.writeHeaders") + .startTimer(); try { writeHeadersInternal(headers, requestPayload); } finally { PerfMark.stopTask("NettyClientStream$Sink.writeHeaders"); + writeHeaders.observeDuration(); } } @@ -207,20 +221,28 @@ public void operationComplete(ChannelFuture future) throws Exception { public void writeFrame( WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { PerfMark.startTask("NettyClientStream$Sink.writeFrame"); + Histogram.Timer writeFrame = + perfmarkNettyClientStreamDuration + .labels("NettyClientStream$Sink.writeFrame") + .startTimer(); try { writeFrameInternal(frame, endOfStream, flush, numMessages); } finally { PerfMark.stopTask("NettyClientStream$Sink.writeFrame"); + writeFrame.observeDuration(); } } @Override public void cancel(Status status) { PerfMark.startTask("NettyClientStream$Sink.cancel"); + Histogram.Timer cancel = + perfmarkNettyClientStreamDuration.labels("NettyClientStream$Sink.cancel").startTimer(); try { writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true); } finally { PerfMark.stopTask("NettyClientStream$Sink.cancel"); + cancel.observeDuration(); } } } diff --git a/src/main/java/io/grpc/netty/WriteQueue.java b/src/main/java/io/grpc/netty/WriteQueue.java index 6cdeca2faa9..203ac42c43c 100644 --- a/src/main/java/io/grpc/netty/WriteQueue.java +++ b/src/main/java/io/grpc/netty/WriteQueue.java @@ -24,9 +24,12 @@ import io.netty.channel.ChannelPromise; import io.perfmark.Link; import io.perfmark.PerfMark; +import io.prometheus.client.Histogram; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang3.tuple.Pair; +import org.tikv.common.util.HistogramUtils; /** A queue of pending writes to a {@link Channel} that is flushed as a single unit. */ class WriteQueue { @@ -44,9 +47,56 @@ public void run() { }; private final Channel channel; - private final Queue queue; + private final Queue> queue; private final AtomicBoolean scheduled = new AtomicBoolean(); + public static final Histogram writeQueuePendingDuration = + HistogramUtils.buildDuration() + .name("grpc_netty_write_queue_pending_duration_ms") + .labelNames("type") + .help("Pending duration of a task in the write queue.") + .register(); + + public static final Histogram writeQueueWaitBatchDuration = + HistogramUtils.buildDuration() + .name("grpc_netty_write_queue_wait_batch_duration_seconds") + .help("Duration of waiting a batch filled in the write queue.") + .register(); + + public static final Histogram writeQueueBatchSize = + Histogram.build() + .exponentialBuckets(1, 2, 10) + .name("grpc_netty_write_queue_batch_size") + .help("Number of tasks in a batch in the write queue.") + .register(); + + public static final Histogram writeQueueCmdRunDuration = + HistogramUtils.buildDuration() + .name("grpc_netty_write_queue_cmd_run_duration_seconds") + .help("Duration of a task execution in the write queue.") + .labelNames("type") + .register(); + + public static final Histogram writeQueueChannelFlushDuration = + HistogramUtils.buildDuration() + .name("grpc_netty_write_queue_channel_flush_duration_seconds") + .help("Duration of a channel flush in the write queue.") + .labelNames("phase") + .register(); + + public static final Histogram writeQueueFlushDuration = + HistogramUtils.buildDuration() + .name("grpc_netty_write_queue_flush_duration_seconds") + .help("Duration of a flush of the write queue.") + .register(); + + public static final Histogram perfmarkWriteQueueDuration = + HistogramUtils.buildDuration() + .name("perfmark_write_queue_duration_seconds") + .help("Perfmark write queue duration seconds") + .labelNames("type") + .register(); + public WriteQueue(Channel channel) { this.channel = Preconditions.checkNotNull(channel, "channel"); queue = new ConcurrentLinkedQueue<>(); @@ -76,7 +126,7 @@ ChannelFuture enqueue(QueuedCommand command, boolean flush) { ChannelPromise promise = channel.newPromise(); command.promise(promise); - queue.add(command); + queue.add(Pair.of(command, System.nanoTime())); if (flush) { scheduleFlush(); } @@ -89,7 +139,8 @@ ChannelFuture enqueue(QueuedCommand command, boolean flush) { * processed in-order with writes. */ void enqueue(Runnable runnable, boolean flush) { - queue.add(new RunnableCommand(runnable)); + Long now = System.nanoTime(); + queue.add(Pair.of(new RunnableCommand(runnable), now)); if (flush) { scheduleFlush(); } @@ -113,38 +164,75 @@ void drainNow() { * the event loop */ private void flush() { + Histogram.Timer flushTimer = writeQueueFlushDuration.startTimer(); PerfMark.startTask("WriteQueue.periodicFlush"); + Histogram.Timer periodicFlush = + perfmarkWriteQueueDuration.labels("WriteQueue.periodicFlush").startTimer(); + + long start = System.nanoTime(); try { - QueuedCommand cmd; + Pair item; int i = 0; boolean flushedOnce = false; - while ((cmd = queue.poll()) != null) { + Histogram.Timer waitBatchTimer = writeQueueWaitBatchDuration.startTimer(); + while ((item = queue.poll()) != null) { + QueuedCommand cmd = item.getLeft(); + String cmdName = cmd.getClass().getSimpleName(); + writeQueuePendingDuration + .labels(cmdName) + .observe((System.nanoTime() - item.getRight()) / 1_000_000.0); + + Histogram.Timer cmdTimer = writeQueueCmdRunDuration.labels(cmdName).startTimer(); + + // Run the command cmd.run(channel); + + cmdTimer.observeDuration(); + if (++i == DEQUE_CHUNK_SIZE) { + waitBatchTimer.observeDuration(); i = 0; // Flush each chunk so we are releasing buffers periodically. In theory this loop // might never end as new events are continuously added to the queue, if we never // flushed in that case we would be guaranteed to OOM. PerfMark.startTask("WriteQueue.flush0"); + Histogram.Timer flush0 = + perfmarkWriteQueueDuration.labels("WriteQueue.flush0").startTimer(); + Histogram.Timer channelFlushTimer = + writeQueueChannelFlushDuration.labels("flush0").startTimer(); try { channel.flush(); } finally { + waitBatchTimer = writeQueueWaitBatchDuration.startTimer(); + writeQueueBatchSize.observe(DEQUE_CHUNK_SIZE); + channelFlushTimer.observeDuration(); PerfMark.stopTask("WriteQueue.flush0"); + flush0.observeDuration(); } flushedOnce = true; } } // Must flush at least once, even if there were no writes. if (i != 0 || !flushedOnce) { + waitBatchTimer.observeDuration(); PerfMark.startTask("WriteQueue.flush1"); + Histogram.Timer flush1 = + perfmarkWriteQueueDuration.labels("WriteQueue.flush1").startTimer(); + Histogram.Timer channelFlushTimer = + writeQueueChannelFlushDuration.labels("flush1").startTimer(); try { channel.flush(); } finally { + writeQueueBatchSize.observe(i); + channelFlushTimer.observeDuration(); PerfMark.stopTask("WriteQueue.flush1"); + flush1.observeDuration(); } } } finally { PerfMark.stopTask("WriteQueue.periodicFlush"); + periodicFlush.observeDuration(); + flushTimer.observeDuration(); // Mark the write as done, if the queue is non-empty after marking trigger a new write. scheduled.set(false); if (!queue.isEmpty()) { diff --git a/src/main/java/io/grpc/stub/ClientCalls.java b/src/main/java/io/grpc/stub/ClientCalls.java index ac3dcdf7a62..9e3bed0fec9 100644 --- a/src/main/java/io/grpc/stub/ClientCalls.java +++ b/src/main/java/io/grpc/stub/ClientCalls.java @@ -31,6 +31,7 @@ import io.grpc.Status; import io.grpc.StatusException; import io.grpc.StatusRuntimeException; +import io.prometheus.client.Histogram; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; @@ -43,6 +44,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; +import org.tikv.common.util.HistogramUtils; /** * Utility functions for processing different call idioms. We have one-to-one correspondence between @@ -53,6 +55,19 @@ public final class ClientCalls { private static final Logger logger = Logger.getLogger(ClientCalls.class.getName()); + public static final Histogram asyncUnaryRequestCallDuration = + HistogramUtils.buildDuration() + .name("grpc_client_async_unary_request_call_duration_seconds") + .help("Histogram of time spent in asyncUnaryRequestCall") + .labelNames("phase") + .register(); + + public static final Histogram blockingUnaryRequestWaitDuration = + HistogramUtils.buildDuration() + .name("grpc_client_blocking_unary_request_wait_duration_seconds") + .help("Histogram of time spent waiting for future in blockingUnaryCall") + .register(); + // Prevent instantiation private ClientCalls() {} @@ -144,8 +159,10 @@ public static RespT blockingUnaryCall( callOptions .withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING) .withExecutor(executor)); + Histogram.Timer waitTimer = null; try { ListenableFuture responseFuture = futureUnaryCall(call, req); + waitTimer = blockingUnaryRequestWaitDuration.startTimer(); while (!responseFuture.isDone()) { try { executor.waitAndDrain(); @@ -163,6 +180,9 @@ public static RespT blockingUnaryCall( // Something very bad happened. All bets are off; it may be dangerous to wait for onClose(). throw cancelThrow(call, e); } finally { + if (waitTimer != null) { + waitTimer.observeDuration(); + } if (interrupt) { Thread.currentThread().interrupt(); } @@ -306,10 +326,20 @@ private static void asyncUnaryRequestCall( private static void asyncUnaryRequestCall( ClientCall call, ReqT req, StartableListener responseListener) { + Histogram.Timer startCallTimer = + asyncUnaryRequestCallDuration.labels("start_call").startTimer(); startCall(call, responseListener); + startCallTimer.observeDuration(); try { + Histogram.Timer sendMessageTimer = + asyncUnaryRequestCallDuration.labels("send_message").startTimer(); call.sendMessage(req); + sendMessageTimer.observeDuration(); + + Histogram.Timer halfCloseTimer = + asyncUnaryRequestCallDuration.labels("half_close").startTimer(); call.halfClose(); + halfCloseTimer.observeDuration(); } catch (RuntimeException e) { throw cancelThrow(call, e); } catch (Error e) { diff --git a/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 368df7ecc62..1a95ff9c5ba 100644 --- a/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -39,6 +39,7 @@ import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import io.prometheus.client.Histogram; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; @@ -49,10 +50,48 @@ import java.nio.channels.spi.SelectorProvider; import java.util.Map; import java.util.concurrent.Executor; +import org.tikv.common.util.HistogramUtils; /** {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation. */ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { + public static final Histogram socketWriteDuration = + HistogramUtils.buildDuration() + .name("netty_nio_socket_channel_write_duration_seconds") + .help("Time taken to write data to socket") + .register(); + public static final Histogram socketWriteBytes = + HistogramUtils.buildBytes() + .name("netty_nio_socket_channel_write_bytes") + .help("number of bytes for each write call") + .register(); + public static final Histogram socketWrittenBytes = + HistogramUtils.buildBytes() + .name("netty_nio_socket_channel_written_bytes") + .help("number of bytes actually written for each write call") + .register(); + public static final Histogram socketWriteLeftBytes = + HistogramUtils.buildBytes() + .name("netty_nio_socket_channel_write_left_bytes") + .help("number of bytes not written for each write call") + .register(); + public static final Histogram socketReadDuration = + HistogramUtils.buildDuration() + .name("netty_nio_socket_channel_read_duration_seconds") + .help("Time taken to read data to socket") + .register(); + public static final Histogram socketReadBytes = + HistogramUtils.buildBytes() + .name("netty_nio_socket_channel_read_bytes") + .help("number of bytes for each read call") + .register(); + + public static final Histogram socketReadLeftBytes = + HistogramUtils.buildBytes() + .name("netty_nio_socket_channel_read_left_bytes") + .help("number of bytes not read for each read call") + .register(); + private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class); private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); @@ -349,8 +388,15 @@ protected void doClose() throws Exception { @Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); - allocHandle.attemptedBytesRead(byteBuf.writableBytes()); - return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); + int attemptedBytes = byteBuf.writableBytes(); + allocHandle.attemptedBytesRead(attemptedBytes); + Histogram.Timer socketReadTime = socketReadDuration.startTimer(); + SocketChannel sc = javaChannel(); + int localReadBytes = byteBuf.writeBytes(sc, allocHandle.attemptedBytesRead()); + socketReadTime.observeDuration(); + socketReadBytes.observe(localReadBytes); + socketReadLeftBytes.observe(attemptedBytes - localReadBytes); + return localReadBytes; } @Override @@ -415,11 +461,16 @@ protected void doWrite(ChannelOutboundBuffer in) throws Exception { // to check if the total size of all the buffers is non-zero. ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); + socketWriteBytes.observe(attemptedBytes); + Histogram.Timer writeTime = socketWriteDuration.startTimer(); final int localWrittenBytes = ch.write(buffer); + writeTime.observeDuration(); + socketWrittenBytes.observe(localWrittenBytes); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } + socketWriteLeftBytes.observe(attemptedBytes - localWrittenBytes); adjustMaxBytesPerGatheringWrite( attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); @@ -433,11 +484,16 @@ protected void doWrite(ChannelOutboundBuffer in) throws Exception { // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); + socketWriteBytes.observe(attemptedBytes); + Histogram.Timer writeTime = socketWriteDuration.startTimer(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); + writeTime.observeDuration(); + socketWrittenBytes.observe(localWrittenBytes); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } + socketWriteLeftBytes.observe(attemptedBytes - localWrittenBytes); // Casting to int is safe because we limit the total amount of data in the nioBuffers to // int above. adjustMaxBytesPerGatheringWrite( diff --git a/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java b/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java index 187308ca8c6..b19ec4a4a38 100644 --- a/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java +++ b/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java @@ -65,6 +65,8 @@ import io.netty.handler.codec.http2.Http2HeadersEncoder.SensitivityDetector; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.UnstableApi; +import io.prometheus.client.Histogram; +import org.tikv.common.util.HistogramUtils; /** A {@link Http2FrameWriter} that supports all frame types defined by the HTTP/2 specification. */ @UnstableApi @@ -83,6 +85,12 @@ public class DefaultHttp2FrameWriter private final Http2HeadersEncoder headersEncoder; private int maxFrameSize; + public static final Histogram writeHeaderDuration = + HistogramUtils.buildDuration() + .name("netty_http2_frame_writer_write_header_duration_seconds") + .help("Time taken to encode a header") + .register(); + public DefaultHttp2FrameWriter() { this(new DefaultHttp2HeadersEncoder()); } @@ -549,6 +557,7 @@ private ChannelFuture writeHeadersInternal( short weight, boolean exclusive, ChannelPromise promise) { + Histogram.Timer writeHeaderTimer = writeHeaderDuration.startTimer(); ByteBuf headerBlock = null; SimpleChannelPromiseAggregator promiseAggregator = new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor()); @@ -614,7 +623,9 @@ private ChannelFuture writeHeadersInternal( headerBlock.release(); } } - return promiseAggregator.doneAllocatingPromises(); + ChannelPromise result = promiseAggregator.doneAllocatingPromises(); + writeHeaderTimer.observeDuration(); + return result; } /** diff --git a/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index 1e3a9468430..469e0c63894 100644 --- a/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java +++ b/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -31,8 +31,10 @@ import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import io.prometheus.client.Histogram; import java.util.ArrayDeque; import java.util.Deque; +import org.tikv.common.util.HistogramUtils; /** * Basic implementation of {@link Http2RemoteFlowController}. @@ -54,6 +56,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll private WritabilityMonitor monitor; private ChannelHandlerContext ctx; + public static final Histogram byteDistributedDuration = + HistogramUtils.buildDuration() + .name("netty_http2_byte_distributed_duration_seconds") + .help("The duration of byte distributed to streams.") + .register(); + public DefaultHttp2RemoteFlowController(Http2Connection connection) { this(connection, (Listener) null); } @@ -629,9 +637,10 @@ final void writePendingBytes() throws Http2Exception { // Make sure we always write at least once, regardless if we have bytesToWrite or not. // This ensures that zero-length frames will always be written. for (; ; ) { - if (!streamByteDistributor.distribute(bytesToWrite, this) - || (bytesToWrite = writableBytes()) <= 0 - || !isChannelWritable0()) { + Histogram.Timer distributedTimer = byteDistributedDuration.startTimer(); + boolean distributed = streamByteDistributor.distribute(bytesToWrite, this); + distributedTimer.observeDuration(); + if (!distributed || (bytesToWrite = writableBytes()) <= 0 || !isChannelWritable0()) { break; } } diff --git a/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 8485f9efe0f..feb0f9dbbb7 100644 --- a/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -48,9 +48,11 @@ import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import io.prometheus.client.Histogram; import java.net.SocketAddress; import java.util.List; import java.util.concurrent.TimeUnit; +import org.tikv.common.util.HistogramUtils; /** * Provides the default implementation for processing inbound frame events and delegates to a {@link @@ -84,6 +86,18 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder private BaseDecoder byteDecoder; private long gracefulShutdownTimeoutMillis; + public static final Histogram flushFlowControlWriteDuration = + HistogramUtils.buildDuration() + .name("netty_http2_flush_flow_control_write_duration_seconds") + .help("The time it takes to flush the pending bytes via flow control in seconds.") + .register(); + + public static final Histogram flushCtxFlushDuration = + HistogramUtils.buildDuration() + .name("netty_http2_flush_ctx_flush_duration_seconds") + .help("The time it takes to ctx flush in seconds.") + .register(); + protected Http2ConnectionHandler( Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, @@ -203,8 +217,13 @@ public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception { public void flush(ChannelHandlerContext ctx) { try { // Trigger pending writes in the remote flow controller. + Histogram.Timer writeTimer = flushFlowControlWriteDuration.startTimer(); encoder.flowController().writePendingBytes(); + writeTimer.observeDuration(); + + Histogram.Timer flushTimer = flushCtxFlushDuration.startTimer(); ctx.flush(); + flushTimer.observeDuration(); } catch (Http2Exception e) { onError(ctx, true, e); } catch (Throwable cause) { diff --git a/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java b/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java index 395bafcfab4..3279b27e5a3 100644 --- a/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java +++ b/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java @@ -23,6 +23,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.util.internal.UnstableApi; +import io.prometheus.client.Histogram; +import org.tikv.common.util.HistogramUtils; /** * Decorator around a {@link Http2FrameWriter} that logs all outbound frames before calling the @@ -33,6 +35,12 @@ public class Http2OutboundFrameLogger implements Http2FrameWriter { private final Http2FrameWriter writer; private final Http2FrameLogger logger; + public static final Histogram writeHeaderLogDuration = + HistogramUtils.buildDuration() + .name("netty_http2_write_header_log_duration_seconds") + .help("HTTP/2 write header log duration in seconds") + .register(); + public Http2OutboundFrameLogger(Http2FrameWriter writer, Http2FrameLogger logger) { this.writer = checkNotNull(writer, "writer"); this.logger = checkNotNull(logger, "logger"); @@ -58,7 +66,9 @@ public ChannelFuture writeHeaders( int padding, boolean endStream, ChannelPromise promise) { + Histogram.Timer logTimer = writeHeaderLogDuration.startTimer(); logger.logHeaders(OUTBOUND, ctx, streamId, headers, padding, endStream); + logTimer.observeDuration(); return writer.writeHeaders(ctx, streamId, headers, padding, endStream, promise); } @@ -73,8 +83,10 @@ public ChannelFuture writeHeaders( int padding, boolean endStream, ChannelPromise promise) { + Histogram.Timer logTimer = writeHeaderLogDuration.startTimer(); logger.logHeaders( OUTBOUND, ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream); + logTimer.observeDuration(); return writer.writeHeaders( ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream, promise); } diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 7760a2af89b..1072df902ab 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -64,6 +64,7 @@ import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.common.util.HistogramUtils; import org.tikv.common.util.Pair; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb.Store; @@ -109,7 +110,7 @@ public class PDClient extends AbstractGRPCClient private long lastUpdateLeaderTime; public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY = - Histogram.build() + HistogramUtils.buildDuration() .name("client_java_pd_get_region_by_requests_latency") .help("pd getRegionByKey request latency.") .register(); diff --git a/src/main/java/org/tikv/common/policy/RetryPolicy.java b/src/main/java/org/tikv/common/policy/RetryPolicy.java index 2400bab0e6a..6baffa47494 100644 --- a/src/main/java/org/tikv/common/policy/RetryPolicy.java +++ b/src/main/java/org/tikv/common/policy/RetryPolicy.java @@ -25,17 +25,18 @@ import org.tikv.common.operation.ErrorHandler; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.common.util.HistogramUtils; public abstract class RetryPolicy { BackOffer backOffer = ConcreteBackOffer.newCopNextMaxBackOff(); public static final Histogram GRPC_SINGLE_REQUEST_LATENCY = - Histogram.build() + HistogramUtils.buildDuration() .name("client_java_grpc_single_requests_latency") .help("grpc request latency.") .labelNames("type") .register(); public static final Histogram CALL_WITH_RETRY_DURATION = - Histogram.build() + HistogramUtils.buildDuration() .name("client_java_call_with_retry_duration") .help("callWithRetry duration.") .labelNames("type") diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index ddbfb37b9bc..7bccbb77d78 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -38,6 +38,7 @@ import org.tikv.common.log.SlowLogSpan; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; +import org.tikv.common.util.HistogramUtils; import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.TikvGrpc; @@ -48,13 +49,13 @@ public abstract class AbstractRegionStoreClient private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class); public static final Histogram SEEK_LEADER_STORE_DURATION = - Histogram.build() + HistogramUtils.buildDuration() .name("client_java_seek_leader_store_duration") .help("seek leader store duration.") .register(); public static final Histogram SEEK_PROXY_STORE_DURATION = - Histogram.build() + HistogramUtils.buildDuration() .name("client_java_seek_proxy_store_duration") .help("seek proxy store duration.") .register(); diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index d18246ff0ef..fdce6465915 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -38,6 +38,7 @@ import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.common.util.HistogramUtils; import org.tikv.common.util.Pair; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb.Peer; @@ -48,12 +49,12 @@ public class RegionManager { private static final Logger logger = LoggerFactory.getLogger(RegionManager.class); public static final Histogram GET_REGION_BY_KEY_REQUEST_LATENCY = - Histogram.build() + HistogramUtils.buildDuration() .name("client_java_get_region_by_requests_latency") .help("getRegionByKey request latency.") .register(); public static final Histogram SCAN_REGIONS_REQUEST_LATENCY = - Histogram.build() + HistogramUtils.buildDuration() .name("client_java_scan_regions_request_latency") .help("scanRegions request latency.") .register(); diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 992ff5c101e..056f67885d1 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -73,7 +73,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { private Boolean isV4 = null; public static final Histogram GRPC_RAW_REQUEST_LATENCY = - Histogram.build() + HistogramUtils.buildDuration() .name("client_java_grpc_raw_requests_latency") .help("grpc raw request latency.") .labelNames("type") diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index b550b9fdd98..337873eb6bf 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -44,7 +44,7 @@ public class ConcreteBackOffer implements BackOffer { private final SlowLog slowLog; public static final Histogram BACKOFF_DURATION = - Histogram.build() + HistogramUtils.buildDuration() .name("client_java_backoff_duration") .help("backoff duration.") .labelNames("type") diff --git a/src/main/java/org/tikv/common/util/HistogramUtils.java b/src/main/java/org/tikv/common/util/HistogramUtils.java new file mode 100644 index 00000000000..848dc6d62e4 --- /dev/null +++ b/src/main/java/org/tikv/common/util/HistogramUtils.java @@ -0,0 +1,50 @@ +/* + * Copyright 2021 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.util; + +import io.prometheus.client.Histogram; + +public class HistogramUtils { + private static final double[] DURATION_BUCKETS = + new double[] { + 0.001D, 0.002D, 0.003D, 0.004D, 0.005D, + 0.008D, 0.010D, 0.012D, 0.015D, 0.020D, + 0.025D, 0.030D, 0.035D, 0.040D, 0.045D, + 0.050D, 0.060D, 0.07D, 0.080D, 0.090D, + 0.10D, 0.120D, 0.150D, 0.170D, 0.200D, + 0.4D, 0.5D, 0.6D, 0.7D, 0.8D, + 1D, 2.5D, 5D, 7.5D, 10D, + }; + + private static final double[] BYTES_BUCKETS; + + static { + BYTES_BUCKETS = new double[30]; + for (int i = 0; i < 30; ++i) { + BYTES_BUCKETS[i] = 1 * Math.pow(1.7, i); + } + } + + public static Histogram.Builder buildDuration() { + return Histogram.build().buckets(DURATION_BUCKETS); + } + + public static Histogram.Builder buildBytes() { + return Histogram.build().buckets(BYTES_BUCKETS); + } +} diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 155a0010f51..419cb2f5f49 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -51,7 +51,7 @@ public class RawKVClient implements RawKVClientBase { private static final Logger logger = LoggerFactory.getLogger(RawKVClient.class); public static final Histogram RAW_REQUEST_LATENCY = - Histogram.build() + HistogramUtils.buildDuration() .name("client_java_raw_requests_latency") .help("client raw request latency.") .labelNames("type") diff --git a/src/main/java/org/tikv/raw/SmartRawKVClient.java b/src/main/java/org/tikv/raw/SmartRawKVClient.java index 31cf97a7364..baf15f87d44 100644 --- a/src/main/java/org/tikv/raw/SmartRawKVClient.java +++ b/src/main/java/org/tikv/raw/SmartRawKVClient.java @@ -23,6 +23,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.exception.CircuitBreakerOpenException; +<<<<<<< HEAD +======= +import org.tikv.common.util.HistogramUtils; +import org.tikv.common.util.Pair; +>>>>>>> 4f0ec583... add metrics inside grpc and netty (#484) import org.tikv.common.util.ScanOption; import org.tikv.kvproto.Kvrpcpb; import org.tikv.service.failsafe.CircuitBreaker; @@ -31,7 +36,7 @@ public class SmartRawKVClient implements RawKVClientBase { private static final Logger logger = LoggerFactory.getLogger(SmartRawKVClient.class); private static final Histogram REQUEST_LATENCY = - Histogram.build() + HistogramUtils.buildDuration() .name("client_java_smart_raw_requests_latency") .help("client smart raw request latency.") .labelNames("type") From 84dff88330e0023636935c47e27e376c1cec688e Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Thu, 20 Jan 2022 14:00:25 +0800 Subject: [PATCH 2/2] fix ci Signed-off-by: marsishandsome --- src/main/java/org/tikv/raw/SmartRawKVClient.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/org/tikv/raw/SmartRawKVClient.java b/src/main/java/org/tikv/raw/SmartRawKVClient.java index baf15f87d44..1c5f886bb51 100644 --- a/src/main/java/org/tikv/raw/SmartRawKVClient.java +++ b/src/main/java/org/tikv/raw/SmartRawKVClient.java @@ -23,11 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.exception.CircuitBreakerOpenException; -<<<<<<< HEAD -======= import org.tikv.common.util.HistogramUtils; -import org.tikv.common.util.Pair; ->>>>>>> 4f0ec583... add metrics inside grpc and netty (#484) import org.tikv.common.util.ScanOption; import org.tikv.kvproto.Kvrpcpb; import org.tikv.service.failsafe.CircuitBreaker;