Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions api/src/main/java/io/grpc/ClientStreamTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,15 @@ public static final class StreamInfo {
private final CallOptions callOptions;
private final int previousAttempts;
private final boolean isTransparentRetry;
private final boolean isHedging;

StreamInfo(
CallOptions callOptions, int previousAttempts, boolean isTransparentRetry) {
CallOptions callOptions, int previousAttempts, boolean isTransparentRetry,
boolean isHedging) {
this.callOptions = checkNotNull(callOptions, "callOptions");
this.previousAttempts = previousAttempts;
this.isTransparentRetry = isTransparentRetry;
this.isHedging = isHedging;
}

/**
Expand Down Expand Up @@ -165,6 +168,15 @@ public boolean isTransparentRetry() {
return isTransparentRetry;
}

/**
* Whether the stream is hedging.
*
* @since 1.74.0
*/
public boolean isHedging() {
return isHedging;
}

/**
* Converts this StreamInfo into a new Builder.
*
Expand All @@ -174,7 +186,9 @@ public Builder toBuilder() {
return new Builder()
.setCallOptions(callOptions)
.setPreviousAttempts(previousAttempts)
.setIsTransparentRetry(isTransparentRetry);
.setIsTransparentRetry(isTransparentRetry)
.setIsHedging(isHedging);

}

/**
Expand All @@ -192,6 +206,7 @@ public String toString() {
.add("callOptions", callOptions)
.add("previousAttempts", previousAttempts)
.add("isTransparentRetry", isTransparentRetry)
.add("isHedging", isHedging)
.toString();
}

Expand All @@ -204,6 +219,7 @@ public static final class Builder {
private CallOptions callOptions = CallOptions.DEFAULT;
private int previousAttempts;
private boolean isTransparentRetry;
private boolean isHedging;

Builder() {
}
Expand Down Expand Up @@ -236,11 +252,21 @@ public Builder setIsTransparentRetry(boolean isTransparentRetry) {
return this;
}

/**
* Sets whether the stream is hedging.
*
* @since 1.74.0
*/
public Builder setIsHedging(boolean isHedging) {
this.isHedging = isHedging;
return this;
}

/**
* Builds a new StreamInfo.
*/
public StreamInfo build() {
return new StreamInfo(callOptions, previousAttempts, isTransparentRetry);
return new StreamInfo(callOptions, previousAttempts, isTransparentRetry, isHedging);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/io/grpc/internal/ClientCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ public void runInContext() {
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
} else {
ClientStreamTracer[] tracers =
GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
GrpcUtil.getClientStreamTracers(callOptions, headers, 0,
false, false);
String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions";
Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
String description = String.format(
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/io/grpc/internal/GrpcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -759,13 +759,15 @@ public ListenableFuture<SocketStats> getStats() {

/** Gets stream tracers based on CallOptions. */
public static ClientStreamTracer[] getClientStreamTracers(
CallOptions callOptions, Metadata headers, int previousAttempts, boolean isTransparentRetry) {
CallOptions callOptions, Metadata headers, int previousAttempts, boolean isTransparentRetry,
boolean isHedging) {
List<ClientStreamTracer.Factory> factories = callOptions.getStreamTracerFactories();
ClientStreamTracer[] tracers = new ClientStreamTracer[factories.size() + 1];
StreamInfo streamInfo = StreamInfo.newBuilder()
.setCallOptions(callOptions)
.setPreviousAttempts(previousAttempts)
.setIsTransparentRetry(isTransparentRetry)
.setIsHedging(isHedging)
.build();
for (int i = 0; i < factories.size(); i++) {
tracers[i] = factories.get(i).newClientStreamTracer(streamInfo, headers);
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,8 @@ public ClientStream newStream(
// the delayed transport or a real transport will go in-use and cancel the idle timer.
if (!retryEnabled) {
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
callOptions, headers, 0, /* isTransparentRetry= */ false);
callOptions, headers, 0, /* isTransparentRetry= */ false,
/* isHedging= */false);
Context origContext = context.attach();
try {
return delayedTransport.newStream(method, headers, callOptions, tracers);
Expand Down Expand Up @@ -519,10 +520,10 @@ void postCommit() {
@Override
ClientStream newSubstream(
Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
boolean isTransparentRetry) {
boolean isTransparentRetry, boolean isHedgedStream) {
CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
newOptions, newHeaders, previousAttempts, isTransparentRetry);
newOptions, newHeaders, previousAttempts, isTransparentRetry, isHedgedStream);
Context origContext = context.attach();
try {
return delayedTransport.newStream(method, newHeaders, newOptions, tracers);
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/io/grpc/internal/OobChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented<Ch
public ClientStream newStream(MethodDescriptor<?, ?> method,
CallOptions callOptions, Metadata headers, Context context) {
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
callOptions, headers, 0, /* isTransparentRetry= */ false);
callOptions, headers, 0, /* isTransparentRetry= */ false,
/* isHedging= */ false);
Context origContext = context.attach();
// delayed transport's newStream() always acquires a lock, but concurrent performance doesn't
// matter here because OOB communication should be sparse, and it's not on application RPC's
Expand Down
18 changes: 11 additions & 7 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ private void commitAndRun(Substream winningSubstream) {
// returns null means we should not create new sub streams, e.g. cancelled or
// other close condition is met for retriableStream.
@Nullable
private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) {
private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry,
boolean isHedgedStream) {
int inFlight;
do {
inFlight = inFlightSubStreams.get();
Expand All @@ -266,7 +267,8 @@ public ClientStreamTracer newClientStreamTracer(

Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
// NOTICE: This set _must_ be done before stream.start() and it actually is.
sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry);
sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry,
isHedgedStream);
return sub;
}

Expand All @@ -276,7 +278,7 @@ public ClientStreamTracer newClientStreamTracer(
*/
abstract ClientStream newSubstream(
Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
boolean isTransparentRetry);
boolean isTransparentRetry, boolean isHedgedStream);

/** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */
@VisibleForTesting
Expand Down Expand Up @@ -398,7 +400,7 @@ public final void start(ClientStreamListener listener) {
state.buffer.add(new StartEntry());
}

Substream substream = createSubstream(0, false);
Substream substream = createSubstream(0, false, false);
if (substream == null) {
return;
}
Expand Down Expand Up @@ -471,7 +473,7 @@ public void run() {
// If this run is not cancelled, the value of state.hedgingAttemptCount won't change
// until state.addActiveHedge() is called subsequently, even the state could possibly
// change.
Substream newSubstream = createSubstream(state.hedgingAttemptCount, false);
Substream newSubstream = createSubstream(state.hedgingAttemptCount, false, true);
if (newSubstream == null) {
return;
}
Expand Down Expand Up @@ -949,7 +951,8 @@ public void run() {
|| (rpcProgress == RpcProgress.REFUSED
&& noMoreTransparentRetry.compareAndSet(false, true))) {
// transparent retry
final Substream newSubstream = createSubstream(substream.previousAttemptCount, true);
final Substream newSubstream = createSubstream(substream.previousAttemptCount,
true, false);
if (newSubstream == null) {
return;
}
Expand Down Expand Up @@ -1001,7 +1004,8 @@ public void run() {
RetryPlan retryPlan = makeRetryDecision(status, trailers);
if (retryPlan.shouldRetry) {
// retry
Substream newSubstream = createSubstream(substream.previousAttemptCount + 1, false);
Substream newSubstream = createSubstream(substream.previousAttemptCount + 1,
false, false);
if (newSubstream == null) {
return;
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/io/grpc/internal/SubchannelChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public ClientStream newStream(MethodDescriptor<?, ?> method,
transport = notReadyTransport;
}
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
callOptions, headers, 0, /* isTransparentRetry= */ false);
callOptions, headers, 0, /* isTransparentRetry= */ false,
/* isHedging= */ false);
Context origContext = context.attach();
try {
return transport.newStream(method, headers, callOptions, tracers);
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/java/io/grpc/internal/RetriableStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ ClientStream newSubstream(
Metadata metadata,
ClientStreamTracer.Factory tracerFactory,
int previousAttempts,
boolean isTransparentRetry) {
boolean isTransparentRetry,
boolean isHedgedStream) {
bufferSizeTracer =
tracerFactory.newClientStreamTracer(STREAM_INFO, metadata);
int actualPreviousRpcAttemptsInHeader = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS) == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.internal.GrpcUtil.IMPLEMENTATION_VERSION;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.HEDGE_BUCKETS;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LATENCY_BUCKETS;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.RETRY_BUCKETS;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.SIZE_BUCKETS;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TRANSPARENT_RETRY_BUCKETS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
Expand Down Expand Up @@ -64,8 +67,8 @@ public Stopwatch get() {
};

@VisibleForTesting
static boolean ENABLE_OTEL_TRACING = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_OTEL_TRACING",
false);
static boolean ENABLE_OTEL_TRACING =
GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_OTEL_TRACING", false);

private final OpenTelemetry openTelemetrySdk;
private final MeterProvider meterProvider;
Expand Down Expand Up @@ -241,6 +244,54 @@ static OpenTelemetryMetricsResource createMetricInstruments(Meter meter,
.build());
}

if (isMetricEnabled("grpc.client.call.retries", enableMetrics, disableDefault)) {
builder.clientCallRetriesCounter(
meter.histogramBuilder(
"grpc.client.call.retries")
.setUnit("{retry}")
.setDescription("Number of retries during the client call. "
+ "If there were no retries, 0 is not reported.")
.ofLongs()
.setExplicitBucketBoundariesAdvice(RETRY_BUCKETS)
.build());
}

if (isMetricEnabled("grpc.client.call.transparent_retries", enableMetrics,
disableDefault)) {
builder.clientCallTransparentRetriesCounter(
meter.histogramBuilder(
"grpc.client.call.transparent_retries")
.setUnit("{transparent_retry}")
.setDescription("Number of transparent retries during the client call. "
+ "If there were no transparent retries, 0 is not reported.")
.ofLongs()
.setExplicitBucketBoundariesAdvice(TRANSPARENT_RETRY_BUCKETS)
.build());
}

if (isMetricEnabled("grpc.client.call.hedges", enableMetrics, disableDefault)) {
builder.clientCallHedgesCounter(
meter.histogramBuilder(
"grpc.client.call.hedges")
.setUnit("{hedge}")
.setDescription("Number of hedges during the client call. "
+ "If there were no hedges, 0 is not reported.")
.ofLongs()
.setExplicitBucketBoundariesAdvice(HEDGE_BUCKETS)
.build());
}

if (isMetricEnabled("grpc.client.call.retry_delay", enableMetrics, disableDefault)) {
builder.clientCallRetryDelayCounter(
meter.histogramBuilder(
"grpc.client.call.retry_delay")
.setUnit("s")
.setDescription("Total time of delay while there is no active attempt during the "
+ "client call")
.setExplicitBucketBoundariesAdvice(LATENCY_BUCKETS)
.build());
}

if (isMetricEnabled("grpc.server.call.started", enableMetrics, disableDefault)) {
builder.serverCallCountCounter(
meter.counterBuilder("grpc.server.call.started")
Expand All @@ -259,8 +310,8 @@ static OpenTelemetryMetricsResource createMetricInstruments(Meter meter,
.build());
}

if (isMetricEnabled("grpc.server.call.sent_total_compressed_message_size", enableMetrics,
disableDefault)) {
if (isMetricEnabled("grpc.server.call.sent_total_compressed_message_size",
enableMetrics, disableDefault)) {
builder.serverTotalSentCompressedMessageSizeCounter(
meter.histogramBuilder(
"grpc.server.call.sent_total_compressed_message_size")
Expand All @@ -271,8 +322,8 @@ static OpenTelemetryMetricsResource createMetricInstruments(Meter meter,
.build());
}

if (isMetricEnabled("grpc.server.call.rcvd_total_compressed_message_size", enableMetrics,
disableDefault)) {
if (isMetricEnabled("grpc.server.call.rcvd_total_compressed_message_size",
enableMetrics, disableDefault)) {
builder.serverTotalReceivedCompressedMessageSizeCounter(
meter.histogramBuilder(
"grpc.server.call.rcvd_total_compressed_message_size")
Expand Down
Loading