Skip to content

Commit

Permalink
Return status code exceptions via CompletableResultCode in GrpcExport…
Browse files Browse the repository at this point in the history
…er and HttpExporter (#6645)
  • Loading branch information
jarrodrobins authored Aug 27, 2024
1 parent 7522bfe commit 71124d2
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal;

import io.opentelemetry.exporter.internal.grpc.GrpcResponse;
import io.opentelemetry.exporter.internal.http.HttpSender;
import javax.annotation.Nullable;

/**
* Represents the failure of a gRPC or HTTP exporter.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public abstract class FailedExportException extends Exception {

private static final long serialVersionUID = 6988924855140178789L;

private FailedExportException(@Nullable Throwable cause) {
super(cause);
}

/** Indicates an HTTP export failed after receiving a response from the server. */
public static HttpExportException httpFailedWithResponse(HttpSender.Response response) {
return new HttpExportException(response, null);
}

/** Indicates an HTTP export failed exceptionally without receiving a response from the server. */
public static HttpExportException httpFailedExceptionally(Throwable cause) {
return new HttpExportException(null, cause);
}

/** Indicates a gRPC export failed after receiving a response from the server. */
public static GrpcExportException grpcFailedWithResponse(GrpcResponse response) {
return new GrpcExportException(response, null);
}

/** Indicates a gRPC export failed exceptionally without receiving a response from the server. */
public static GrpcExportException grpcFailedExceptionally(Throwable cause) {
return new GrpcExportException(null, cause);
}

/** Returns true if the export failed with a response from the server. */
public abstract boolean failedWithResponse();

/** Represents the failure of an HTTP exporter. */
public static final class HttpExportException extends FailedExportException {

private static final long serialVersionUID = -6787390183017184775L;

@Nullable private final HttpSender.Response response;
@Nullable private final Throwable cause;

private HttpExportException(@Nullable HttpSender.Response response, @Nullable Throwable cause) {
super(cause);
this.response = response;
this.cause = cause;
}

@Override
public boolean failedWithResponse() {
return response != null;
}

/**
* Returns the response if the export failed with a response from the server, or null if the
* export failed exceptionally with no response.
*/
@Nullable
public HttpSender.Response getResponse() {
return response;
}

/**
* Returns the exceptional cause of failure, or null if the export failed with a response from
* the server.
*/
@Nullable
@Override
public Throwable getCause() {
return cause;
}
}

/** Represents the failure of a gRPC exporter. */
public static final class GrpcExportException extends FailedExportException {

private static final long serialVersionUID = -9157548250286695364L;

@Nullable private final GrpcResponse response;
@Nullable private final Throwable cause;

private GrpcExportException(@Nullable GrpcResponse response, @Nullable Throwable cause) {
super(cause);
this.response = response;
this.cause = cause;
}

@Override
public boolean failedWithResponse() {
return response != null;
}

/**
* Returns the response if the export failed with a response from the server, or null if the
* export failed exceptionally with no response.
*/
@Nullable
public GrpcResponse getResponse() {
return response;
}

/**
* Returns the exceptional cause of failure, or null if the export failed with a response from
* the server.
*/
@Nullable
@Override
public Throwable getCause() {
return cause;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

import static io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil.GRPC_STATUS_UNAVAILABLE;
import static io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil.GRPC_STATUS_UNIMPLEMENTED;
import static io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil.GRPC_STATUS_UNKNOWN;

import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterMetrics;
import io.opentelemetry.exporter.internal.FailedExportException;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
Expand Down Expand Up @@ -66,39 +68,17 @@ public CompletableResultCode export(T exportRequest, int numItems) {
},
(response, throwable) -> {
exporterMetrics.addFailed(numItems);

logFailureMessage(response, throwable);

switch (response.grpcStatusValue()) {
case GRPC_STATUS_UNIMPLEMENTED:
if (loggedUnimplemented.compareAndSet(false, true)) {
GrpcExporterUtil.logUnimplemented(
internalLogger, type, response.grpcStatusDescription());
}
break;
case GRPC_STATUS_UNAVAILABLE:
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. Server is UNAVAILABLE. "
+ "Make sure your collector is running and reachable from this network. "
+ "Full error message:"
+ response.grpcStatusDescription());
case GRPC_STATUS_UNKNOWN:
result.failExceptionally(FailedExportException.grpcFailedExceptionally(throwable));
break;
default:
logger.log(
Level.WARNING,
"Failed to export "
+ type
+ "s. Server responded with gRPC status code "
+ response.grpcStatusValue()
+ ". Error message: "
+ response.grpcStatusDescription());
result.failExceptionally(FailedExportException.grpcFailedWithResponse(response));
break;
}
if (logger.isLoggable(Level.FINEST)) {
logger.log(
Level.FINEST, "Failed to export " + type + "s. Details follow: " + throwable);
}
result.fail();
});

return result;
Expand All @@ -111,4 +91,38 @@ public CompletableResultCode shutdown() {
}
return grpcSender.shutdown();
}

private void logFailureMessage(GrpcResponse response, Throwable throwable) {
switch (response.grpcStatusValue()) {
case GRPC_STATUS_UNIMPLEMENTED:
if (loggedUnimplemented.compareAndSet(false, true)) {
GrpcExporterUtil.logUnimplemented(internalLogger, type, response.grpcStatusDescription());
}
break;
case GRPC_STATUS_UNAVAILABLE:
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. Server is UNAVAILABLE. "
+ "Make sure your collector is running and reachable from this network. "
+ "Full error message:"
+ response.grpcStatusDescription());
break;
default:
logger.log(
Level.WARNING,
"Failed to export "
+ type
+ "s. Server responded with gRPC status code "
+ response.grpcStatusValue()
+ ". Error message: "
+ response.grpcStatusDescription());
break;
}

if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "Failed to export " + type + "s. Details follow: " + throwable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterMetrics;
import io.opentelemetry.exporter.internal.FailedExportException;
import io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
Expand Down Expand Up @@ -90,7 +91,8 @@ public CompletableResultCode export(T exportRequest, int numItems) {
+ statusCode
+ ". Error message: "
+ status);
result.fail();

result.failExceptionally(FailedExportException.httpFailedWithResponse(httpResponse));
},
e -> {
exporterMetrics.addFailed(numItems);
Expand All @@ -101,7 +103,7 @@ public CompletableResultCode export(T exportRequest, int numItems) {
+ "s. The request could not be executed. Full error message: "
+ e.getMessage(),
e);
result.fail();
result.failExceptionally(FailedExportException.httpFailedExceptionally(e));
});

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.github.netmikey.logunit.api.LogCapturer;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.internal.FailedExportException;
import io.opentelemetry.exporter.internal.TlsUtil;
import io.opentelemetry.exporter.internal.compression.GzipCompressor;
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.grpc.GrpcResponse;
import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.otlp.testing.internal.compressor.Base64Compressor;
Expand Down Expand Up @@ -66,6 +68,7 @@
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509KeyManager;
import javax.net.ssl.X509TrustManager;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.iterable.ThrowingExtractor;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -486,6 +489,7 @@ void connectTimeout() {
CompletableResultCode result =
exporter.export(Collections.singletonList(generateFakeTelemetry()));
assertThat(result.join(10, TimeUnit.SECONDS).isSuccess()).isFalse();

// Assert that the export request fails well before the default connect timeout of 10s
assertThat(System.currentTimeMillis() - startTimeMillis)
.isLessThan(TimeUnit.SECONDS.toMillis(1));
Expand Down Expand Up @@ -540,17 +544,33 @@ void doubleShutdown() {
@Test
@SuppressLogger(GrpcExporter.class)
void error() {
addGrpcError(13, null);
int statusCode = 13;
addGrpcError(statusCode, null);

TelemetryExporter<T> exporter = nonRetryingExporter();

try {
assertThat(
exporter
.export(Collections.singletonList(generateFakeTelemetry()))
.join(10, TimeUnit.SECONDS)
.isSuccess())
.isFalse();
CompletableResultCode result =
exporter
.export(Collections.singletonList(generateFakeTelemetry()))
.join(10, TimeUnit.SECONDS);

assertThat(result.isSuccess()).isFalse();

assertThat(result.getFailureThrowable())
.asInstanceOf(
InstanceOfAssertFactories.throwable(FailedExportException.GrpcExportException.class))
.returns(true, Assertions.from(FailedExportException::failedWithResponse))
.satisfies(
ex -> {
assertThat(ex.getResponse())
.isNotNull()
.extracting(GrpcResponse::grpcStatusValue)
.isEqualTo(statusCode);

assertThat(ex.getCause()).isNull();
});

LoggingEvent log =
logs.assertContains(
"Failed to export "
Expand All @@ -562,6 +582,33 @@ void error() {
}
}

@Test
@SuppressLogger(GrpcExporter.class)
void errorWithUnknownError() {
addGrpcError(2, null);

TelemetryExporter<T> exporter = nonRetryingExporter();

try {
assertThat(
exporter
.export(Collections.singletonList(generateFakeTelemetry()))
.join(10, TimeUnit.SECONDS)
.getFailureThrowable())
.asInstanceOf(
InstanceOfAssertFactories.throwable(FailedExportException.GrpcExportException.class))
.returns(false, Assertions.from(FailedExportException::failedWithResponse))
.satisfies(
ex -> {
assertThat(ex.getResponse()).isNull();

assertThat(ex.getCause()).isNotNull();
});
} finally {
exporter.shutdown();
}
}

@Test
@SuppressLogger(GrpcExporter.class)
void errorWithMessage() {
Expand Down
Loading

0 comments on commit 71124d2

Please sign in to comment.