From e02d26f3ea21cb96903715895a6882e150fb9a5e Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Wed, 15 Nov 2023 18:24:03 -0600 Subject: [PATCH 1/2] Add compressor SPI to support additional compression algos --- .../internal/compression/Compressor.java | 32 +++++++ .../compression/CompressorProvider.java | 18 ++++ .../internal/compression/CompressorUtil.java | 59 ++++++++++++ .../internal/compression/GzipCompressor.java | 37 ++++++++ .../internal/http/HttpExporterBuilder.java | 16 ++-- .../internal/http/HttpSenderProvider.java | 3 +- .../OtlpHttpLogRecordExporterBuilder.java | 15 ++- .../OtlpHttpMetricExporterBuilder.java | 15 ++- .../trace/OtlpHttpSpanExporterBuilder.java | 15 ++- .../otlp/testing-internal/build.gradle.kts | 1 + .../AbstractHttpTelemetryExporterTest.java | 62 +++++++++---- .../testing/internal/lz4/Lz4Compressor.java | 32 +++++++ .../internal/lz4/Lz4CompressorProvider.java | 17 ++++ ...er.internal.compression.CompressorProvider | 1 + .../sender/jdk/internal/JdkHttpSender.java | 20 ++-- .../jdk/internal/JdkHttpSenderProvider.java | 11 +-- .../jdk/internal/JdkHttpSenderTest.java | 4 +- .../okhttp/internal/OkHttpHttpSender.java | 27 +++--- .../internal/OkHttpHttpSenderProvider.java | 5 +- .../internal/HttpExporterBuilderTest.java | 92 ------------------- .../internal/OkHttpHttpSuppressionTest.java | 2 +- 21 files changed, 324 insertions(+), 160 deletions(-) create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/Compressor.java create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorProvider.java create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorUtil.java create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/GzipCompressor.java create mode 100644 exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/lz4/Lz4Compressor.java create mode 100644 exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/lz4/Lz4CompressorProvider.java create mode 100644 exporters/otlp/testing-internal/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.compression.CompressorProvider delete mode 100644 exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/HttpExporterBuilderTest.java diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/Compressor.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/Compressor.java new file mode 100644 index 00000000000..71894cc9d4a --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/Compressor.java @@ -0,0 +1,32 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.compression; + +import java.io.IOException; +import java.io.OutputStream; +import javax.annotation.concurrent.ThreadSafe; + +/** + * An abstraction for compressing messages. Implementation MUST be thread safe as the same instance + * is expected to be used many times and concurrently. Instances are usually singletons. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +@ThreadSafe +public interface Compressor { + + /** + * The name of the compressor encoding. + * + *

Used to identify the compressor during configuration and to populate the {@code + * Content-Encoding} header. + */ + String getEncoding(); + + /** Wrap the {@code outputStream} with a compressing output stream. */ + OutputStream compress(OutputStream outputStream) throws IOException; +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorProvider.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorProvider.java new file mode 100644 index 00000000000..6b4518f1ea0 --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorProvider.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.compression; + +/** + * A service provider interface (SPI) for providing {@link Compressor}s. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public interface CompressorProvider { + + /** Return the {@link Compressor}. */ + Compressor getInstance(); +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorUtil.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorUtil.java new file mode 100644 index 00000000000..6a777f759ba --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorUtil.java @@ -0,0 +1,59 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.compression; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; + +/** + * Utilities for resolving SPI {@link Compressor}s. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + * + * @see CompressorProvider + */ +public final class CompressorUtil { + + private static final Map compressorRegistry = buildCompressorRegistry(); + + private CompressorUtil() {} + + /** Get list of loaded compressors, named according to {@link Compressor#getEncoding()}. */ + public static Set supportedCompressors() { + return Collections.unmodifiableSet(compressorRegistry.keySet()); + } + + /** + * Resolve the {@link Compressor} with the {@link Compressor#getEncoding()} equal to the {@code + * encoding}. + * + * @throws IllegalArgumentException if no match is found + */ + public static Compressor resolveCompressor(String encoding) { + Compressor compressor = compressorRegistry.get(encoding); + if (compressor == null) { + throw new IllegalArgumentException( + "Could not resolve compressor for encoding \"" + encoding + "\"."); + } + return compressor; + } + + private static Map buildCompressorRegistry() { + Map compressors = new HashMap<>(); + for (CompressorProvider spi : + ServiceLoader.load(CompressorProvider.class, CompressorUtil.class.getClassLoader())) { + Compressor compressor = spi.getInstance(); + compressors.put(compressor.getEncoding(), compressor); + } + // Hardcode gzip compressor + compressors.put(GzipCompressor.getInstance().getEncoding(), GzipCompressor.getInstance()); + return compressors; + } +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/GzipCompressor.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/GzipCompressor.java new file mode 100644 index 00000000000..7395fdb41b1 --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/GzipCompressor.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.compression; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.GZIPOutputStream; + +/** + * Gzip {@link Compressor}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class GzipCompressor implements Compressor { + + private static final GzipCompressor INSTANCE = new GzipCompressor(); + + private GzipCompressor() {} + + public static GzipCompressor getInstance() { + return INSTANCE; + } + + @Override + public String getEncoding() { + return "gzip"; + } + + @Override + public OutputStream compress(OutputStream outputStream) throws IOException { + return new GZIPOutputStream(outputStream); + } +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java index fc4473b034c..1dc033b527e 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java @@ -11,6 +11,7 @@ import io.opentelemetry.exporter.internal.ExporterBuilderUtil; import io.opentelemetry.exporter.internal.TlsConfigHelper; import io.opentelemetry.exporter.internal.auth.Authenticator; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.net.URI; @@ -18,6 +19,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.ServiceLoader; import java.util.StringJoiner; import java.util.concurrent.TimeUnit; @@ -46,7 +48,7 @@ public final class HttpExporterBuilder { private String endpoint; private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS); - private boolean compressionEnabled = false; + @Nullable private Compressor compressor; private boolean exportAsJson = false; @Nullable private Map headers; @@ -77,8 +79,8 @@ public HttpExporterBuilder setEndpoint(String endpoint) { return this; } - public HttpExporterBuilder setCompression(String compressionMethod) { - this.compressionEnabled = compressionMethod.equals("gzip"); + public HttpExporterBuilder setCompression(@Nullable Compressor compressor) { + this.compressor = compressor; return this; } @@ -133,7 +135,7 @@ public HttpExporterBuilder copy() { copy.endpoint = endpoint; copy.timeoutNanos = timeoutNanos; copy.exportAsJson = exportAsJson; - copy.compressionEnabled = compressionEnabled; + copy.compressor = compressor; if (headers != null) { copy.headers = new HashMap<>(headers); } @@ -154,7 +156,7 @@ public HttpExporter build() { HttpSender httpSender = httpSenderProvider.createSender( endpoint, - compressionEnabled, + compressor, exportAsJson ? "application/json" : "application/x-protobuf", timeoutNanos, headerSupplier, @@ -176,7 +178,9 @@ public String toString(boolean includePrefixAndSuffix) { joiner.add("type=" + type); joiner.add("endpoint=" + endpoint); joiner.add("timeoutNanos=" + timeoutNanos); - joiner.add("compressionEnabled=" + compressionEnabled); + joiner.add( + "compressorEncoding=" + + Optional.ofNullable(compressor).map(Compressor::getEncoding).orElse(null)); joiner.add("exportAsJson=" + exportAsJson); if (headers != null) { StringJoiner headersJoiner = new StringJoiner(", ", "Headers{", "}"); diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSenderProvider.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSenderProvider.java index 22050e8e624..79b85045558 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSenderProvider.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSenderProvider.java @@ -6,6 +6,7 @@ package io.opentelemetry.exporter.internal.http; import io.opentelemetry.exporter.internal.auth.Authenticator; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.util.Map; import java.util.function.Supplier; @@ -25,7 +26,7 @@ public interface HttpSenderProvider { /** Returns a {@link HttpSender} configured with the provided parameters. */ HttpSender createSender( String endpoint, - boolean compressionEnabled, + @Nullable Compressor compressor, String contentType, long timeoutNanos, Supplier> headerSupplier, diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java index 024d6b8b10e..7087eef3461 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java @@ -7,14 +7,17 @@ import static io.opentelemetry.api.internal.Utils.checkArgument; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.joining; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.time.Duration; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import javax.net.ssl.SSLContext; @@ -76,10 +79,16 @@ public OtlpHttpLogRecordExporterBuilder setEndpoint(String endpoint) { */ public OtlpHttpLogRecordExporterBuilder setCompression(String compressionMethod) { requireNonNull(compressionMethod, "compressionMethod"); + if (compressionMethod.equals("none")) { + delegate.setCompression(null); + return this; + } + Set supportedCompressionMethods = CompressorUtil.supportedCompressors(); checkArgument( - compressionMethod.equals("gzip") || compressionMethod.equals("none"), - "Unsupported compression method. Supported compression methods include: gzip, none."); - delegate.setCompression(compressionMethod); + supportedCompressionMethods.contains(compressionMethod), + "Unsupported compressionMethod. Compression method must be \"none\" or one of: " + + supportedCompressionMethods.stream().collect(joining(",", "[", "]"))); + delegate.setCompression(CompressorUtil.resolveCompressor(compressionMethod)); return this; } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java index 57cca1c8f9b..de7d64861c1 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java @@ -7,8 +7,10 @@ import static io.opentelemetry.api.internal.Utils.checkArgument; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.joining; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; @@ -18,6 +20,7 @@ import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector; import io.opentelemetry.sdk.metrics.export.MetricExporter; import java.time.Duration; +import java.util.Set; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; import javax.net.ssl.X509TrustManager; @@ -87,10 +90,16 @@ public OtlpHttpMetricExporterBuilder setEndpoint(String endpoint) { */ public OtlpHttpMetricExporterBuilder setCompression(String compressionMethod) { requireNonNull(compressionMethod, "compressionMethod"); + if (compressionMethod.equals("none")) { + delegate.setCompression(null); + return this; + } + Set supportedCompressionMethods = CompressorUtil.supportedCompressors(); checkArgument( - compressionMethod.equals("gzip") || compressionMethod.equals("none"), - "Unsupported compression method. Supported compression methods include: gzip, none."); - delegate.setCompression(compressionMethod); + supportedCompressionMethods.contains(compressionMethod), + "Unsupported compressionMethod. Compression method must be \"none\" or one of: " + + supportedCompressionMethods.stream().collect(joining(",", "[", "]"))); + delegate.setCompression(CompressorUtil.resolveCompressor(compressionMethod)); return this; } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java index 91645cafba5..be077cbda75 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java @@ -7,14 +7,17 @@ import static io.opentelemetry.api.internal.Utils.checkArgument; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.joining; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.time.Duration; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import javax.net.ssl.SSLContext; @@ -76,10 +79,16 @@ public OtlpHttpSpanExporterBuilder setEndpoint(String endpoint) { */ public OtlpHttpSpanExporterBuilder setCompression(String compressionMethod) { requireNonNull(compressionMethod, "compressionMethod"); + if (compressionMethod.equals("none")) { + delegate.setCompression(null); + return this; + } + Set supportedCompressionMethods = CompressorUtil.supportedCompressors(); checkArgument( - compressionMethod.equals("gzip") || compressionMethod.equals("none"), - "Unsupported compression method. Supported compression methods include: gzip, none."); - delegate.setCompression(compressionMethod); + supportedCompressionMethods.contains(compressionMethod), + "Unsupported compressionMethod. Compression method must be \"none\" or one of: " + + supportedCompressionMethods.stream().collect(joining(",", "[", "]"))); + delegate.setCompression(CompressorUtil.resolveCompressor(compressionMethod)); return this; } diff --git a/exporters/otlp/testing-internal/build.gradle.kts b/exporters/otlp/testing-internal/build.gradle.kts index 2e491e84c13..ee559c015eb 100644 --- a/exporters/otlp/testing-internal/build.gradle.kts +++ b/exporters/otlp/testing-internal/build.gradle.kts @@ -31,6 +31,7 @@ dependencies { implementation("com.linecorp.armeria:armeria-junit5") implementation("io.github.netmikey.logunit:logunit-jul") implementation("org.assertj:assertj-core") + implementation("org.lz4:lz4-java:1.8.0") } // Skip OWASP dependencyCheck task on test module diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java index be8e5e773fc..882151a9dd0 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java @@ -27,8 +27,10 @@ import com.linecorp.armeria.testing.junit5.server.ServerExtension; import io.github.netmikey.logunit.api.LogCapturer; import io.opentelemetry.exporter.internal.TlsUtil; +import io.opentelemetry.exporter.internal.compression.GzipCompressor; import io.opentelemetry.exporter.internal.http.HttpExporter; import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.otlp.testing.internal.lz4.Lz4Compressor; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; @@ -63,9 +65,11 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.X509KeyManager; import javax.net.ssl.X509TrustManager; +import net.jpountz.lz4.LZ4FrameInputStream; import okio.Buffer; import okio.GzipSource; import okio.Okio; +import okio.Source; import org.assertj.core.api.iterable.ThrowingExtractor; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -163,8 +167,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) { aggReq -> { T request; try { - byte[] requestBody = - maybeGzipInflate(aggReq.headers(), aggReq.content().array()); + byte[] requestBody = maybeInflate(aggReq.headers(), aggReq.content().array()); request = parse.extractThrows(requestBody); } catch (IOException e) { throw new UncheckedIOException(e); @@ -181,15 +184,21 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) { return HttpResponse.of(responseFuture); } - private static byte[] maybeGzipInflate(RequestHeaders requestHeaders, byte[] content) + private static byte[] maybeInflate(RequestHeaders requestHeaders, byte[] content) throws IOException { - if (!requestHeaders.contains("content-encoding", "gzip")) { - return content; + if (requestHeaders.contains("content-encoding", "gzip")) { + Buffer buffer = new Buffer(); + GzipSource gzipSource = new GzipSource(Okio.source(new ByteArrayInputStream(content))); + gzipSource.read(buffer, Integer.MAX_VALUE); + return buffer.readByteArray(); } - Buffer buffer = new Buffer(); - GzipSource gzipSource = new GzipSource(Okio.source(new ByteArrayInputStream(content))); - gzipSource.read(buffer, Integer.MAX_VALUE); - return buffer.readByteArray(); + if (requestHeaders.contains("content-encoding", "lz4")) { + Buffer buffer = new Buffer(); + Source lz4Source = Okio.source(new LZ4FrameInputStream(new ByteArrayInputStream(content))); + lz4Source.read(buffer, Integer.MAX_VALUE); + return buffer.readByteArray(); + } + return content; } } @@ -275,9 +284,7 @@ void multipleItems() { void compressionWithNone() { TelemetryExporter exporter = exporterBuilder().setEndpoint(server.httpUri() + path).setCompression("none").build(); - assertThat(exporter.unwrap()) - .extracting("delegate.httpSender.compressionEnabled") - .isEqualTo(false); + assertThat(exporter.unwrap()).extracting("delegate.httpSender.compressor").isNull(); try { CompletableResultCode result = exporter.export(Collections.singletonList(generateFakeTelemetry())); @@ -295,8 +302,8 @@ void compressionWithGzip() { TelemetryExporter exporter = exporterBuilder().setEndpoint(server.httpUri() + path).setCompression("gzip").build(); assertThat(exporter.unwrap()) - .extracting("delegate.httpSender.compressionEnabled") - .isEqualTo(true); + .extracting("delegate.httpSender.compressor") + .isEqualTo(GzipCompressor.getInstance()); try { CompletableResultCode result = exporter.export(Collections.singletonList(generateFakeTelemetry())); @@ -309,6 +316,25 @@ void compressionWithGzip() { } } + @Test + void compressionWithSpiCompressor() { + TelemetryExporter exporter = + exporterBuilder().setEndpoint(server.httpUri() + path).setCompression("lz4").build(); + assertThat(exporter.unwrap()) + .extracting("delegate.httpSender.compressor") + .isEqualTo(Lz4Compressor.getInstance()); + try { + CompletableResultCode result = + exporter.export(Collections.singletonList(generateFakeTelemetry())); + assertThat(result.join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + assertThat(httpRequests) + .singleElement() + .satisfies(req -> assertThat(req.headers().get("content-encoding")).isEqualTo("lz4")); + } finally { + exporter.shutdown(); + } + } + @Test void authorityWithAuth() { TelemetryExporter exporter = @@ -612,6 +638,8 @@ void validConfig() { .doesNotThrowAnyException(); assertThatCode(() -> exporterBuilder().setCompression("gzip")).doesNotThrowAnyException(); + // SPI compressor available for this test but not packaged with OTLP exporter + assertThatCode(() -> exporterBuilder().setCompression("lz4")).doesNotThrowAnyException(); assertThatCode(() -> exporterBuilder().setCompression("none")).doesNotThrowAnyException(); assertThatCode(() -> exporterBuilder().addHeader("foo", "bar").addHeader("baz", "qux")) @@ -654,7 +682,7 @@ void invalidConfig() { assertThatThrownBy(() -> exporterBuilder().setCompression("foo")) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Unsupported compression method. Supported compression methods include: gzip, none."); + "Unsupported compressionMethod. Compression method must be \"none\" or one of: [lz4,gzip]"); } @Test @@ -728,7 +756,7 @@ void stringRepresentation() throws IOException, CertificateEncodingException { + "timeoutNanos=" + TimeUnit.SECONDS.toNanos(10) + ", " - + "compressionEnabled=false, " + + "compressorEncoding=null, " + "exportAsJson=false, " + "headers=Headers\\{User-Agent=OBFUSCATED\\}" + "\\}"); @@ -764,7 +792,7 @@ void stringRepresentation() throws IOException, CertificateEncodingException { + "timeoutNanos=" + TimeUnit.SECONDS.toNanos(5) + ", " - + "compressionEnabled=true, " + + "compressorEncoding=gzip, " + "exportAsJson=false, " + "headers=Headers\\{.*foo=OBFUSCATED.*\\}, " + "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3\\}" diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/lz4/Lz4Compressor.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/lz4/Lz4Compressor.java new file mode 100644 index 00000000000..895a5ff9962 --- /dev/null +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/lz4/Lz4Compressor.java @@ -0,0 +1,32 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.testing.internal.lz4; + +import io.opentelemetry.exporter.internal.compression.Compressor; +import java.io.IOException; +import java.io.OutputStream; +import net.jpountz.lz4.LZ4FrameOutputStream; + +public class Lz4Compressor implements Compressor { + + private static final Lz4Compressor INSTANCE = new Lz4Compressor(); + + private Lz4Compressor() {} + + public static final Lz4Compressor getInstance() { + return INSTANCE; + } + + @Override + public String getEncoding() { + return "lz4"; + } + + @Override + public OutputStream compress(OutputStream outputStream) throws IOException { + return new LZ4FrameOutputStream(outputStream); + } +} diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/lz4/Lz4CompressorProvider.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/lz4/Lz4CompressorProvider.java new file mode 100644 index 00000000000..818f5ddf92c --- /dev/null +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/lz4/Lz4CompressorProvider.java @@ -0,0 +1,17 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.testing.internal.lz4; + +import io.opentelemetry.exporter.internal.compression.Compressor; +import io.opentelemetry.exporter.internal.compression.CompressorProvider; + +public class Lz4CompressorProvider implements CompressorProvider { + + @Override + public Compressor getInstance() { + return Lz4Compressor.getInstance(); + } +} diff --git a/exporters/otlp/testing-internal/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.compression.CompressorProvider b/exporters/otlp/testing-internal/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.compression.CompressorProvider new file mode 100644 index 00000000000..732ecf8ac84 --- /dev/null +++ b/exporters/otlp/testing-internal/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.compression.CompressorProvider @@ -0,0 +1 @@ +io.opentelemetry.exporter.otlp.testing.internal.lz4.Lz4CompressorProvider diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java index 25866a162fc..a07ad0a29eb 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java @@ -5,6 +5,7 @@ package io.opentelemetry.exporter.sender.jdk.internal; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.http.HttpSender; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.RetryPolicy; @@ -28,7 +29,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLException; @@ -51,7 +51,7 @@ public final class JdkHttpSender implements HttpSender { private final ExecutorService executorService = Executors.newFixedThreadPool(5); private final HttpClient client; private final URI uri; - private final boolean compressionEnabled; + @Nullable private final Compressor compressor; private final String contentType; private final long timeoutNanos; private final Supplier> headerSupplier; @@ -61,7 +61,7 @@ public final class JdkHttpSender implements HttpSender { JdkHttpSender( HttpClient client, String endpoint, - boolean compressionEnabled, + @Nullable Compressor compressor, String contentType, long timeoutNanos, Supplier> headerSupplier, @@ -72,7 +72,7 @@ public final class JdkHttpSender implements HttpSender { } catch (URISyntaxException e) { throw new IllegalArgumentException(e); } - this.compressionEnabled = compressionEnabled; + this.compressor = compressor; this.contentType = contentType; this.timeoutNanos = timeoutNanos; this.headerSupplier = headerSupplier; @@ -81,7 +81,7 @@ public final class JdkHttpSender implements HttpSender { JdkHttpSender( String endpoint, - boolean compressionEnabled, + @Nullable Compressor compressor, String contentType, long timeoutNanos, Supplier> headerSupplier, @@ -90,7 +90,7 @@ public final class JdkHttpSender implements HttpSender { this( configureClient(sslContext), endpoint, - compressionEnabled, + compressor, contentType, timeoutNanos, headerSupplier, @@ -145,10 +145,10 @@ HttpResponse sendInternal(Consumer marshaler) throws IOExc NoCopyByteArrayOutputStream os = threadLocalBaos.get(); os.reset(); - if (compressionEnabled) { - requestBuilder.header("Content-Encoding", "gzip"); - try (GZIPOutputStream gzos = new GZIPOutputStream(os)) { - marshaler.accept(gzos); + if (compressor != null) { + requestBuilder.header("Content-Encoding", compressor.getEncoding()); + try (OutputStream compressed = compressor.compress(os)) { + marshaler.accept(compressed); } catch (IOException e) { throw new IllegalStateException(e); } diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java index dbe93eef3a3..44c6d82c45f 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java @@ -6,6 +6,7 @@ package io.opentelemetry.exporter.sender.jdk.internal; import io.opentelemetry.exporter.internal.auth.Authenticator; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.http.HttpSender; import io.opentelemetry.exporter.internal.http.HttpSenderProvider; import io.opentelemetry.sdk.common.export.RetryPolicy; @@ -26,7 +27,7 @@ public final class JdkHttpSenderProvider implements HttpSenderProvider { @Override public HttpSender createSender( String endpoint, - boolean compressionEnabled, + @Nullable Compressor compressor, String contentType, long timeoutNanos, Supplier> headerSupplier, @@ -35,12 +36,6 @@ public HttpSender createSender( @Nullable SSLContext sslContext, @Nullable X509TrustManager trustManager) { return new JdkHttpSender( - endpoint, - compressionEnabled, - contentType, - timeoutNanos, - headerSupplier, - retryPolicy, - sslContext); + endpoint, compressor, contentType, timeoutNanos, headerSupplier, retryPolicy, sslContext); } } diff --git a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java index 19cb3f997ac..9a7697d463f 100644 --- a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java +++ b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java @@ -52,7 +52,7 @@ void setup() throws IOException, InterruptedException { mockHttpClient, "http://10.255.255.1", // Connecting to a non-routable IP address to trigger connection // timeout - false, + null, "text/plain", Duration.ofSeconds(10).toNanos(), Collections::emptyMap, @@ -96,7 +96,7 @@ void sendInternal_NonRetryableException() throws IOException, InterruptedExcepti void defaultConnectTimeout() { sender = new JdkHttpSender( - "http://localhost", true, "text/plain", 1, Collections::emptyMap, null, null); + "http://localhost", null, "text/plain", 1, Collections::emptyMap, null, null); assertThat(sender) .extracting("client", as(InstanceOfAssertFactories.type(HttpClient.class))) diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java index 2355a94ba60..938f463dfff 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java @@ -8,6 +8,7 @@ import io.opentelemetry.exporter.internal.InstrumentationUtil; import io.opentelemetry.exporter.internal.RetryUtil; import io.opentelemetry.exporter.internal.auth.Authenticator; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.http.HttpSender; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.RetryPolicy; @@ -29,7 +30,6 @@ import okhttp3.RequestBody; import okhttp3.ResponseBody; import okio.BufferedSink; -import okio.GzipSink; import okio.Okio; /** @@ -42,14 +42,14 @@ public final class OkHttpHttpSender implements HttpSender { private final OkHttpClient client; private final HttpUrl url; - private final boolean compressionEnabled; + @Nullable private final Compressor compressor; private final Supplier> headerSupplier; private final MediaType mediaType; /** Create a sender. */ public OkHttpHttpSender( String endpoint, - boolean compressionEnabled, + @Nullable Compressor compressor, String contentType, long timeoutNanos, Supplier> headerSupplier, @@ -81,7 +81,7 @@ public OkHttpHttpSender( } this.client = builder.build(); this.url = HttpUrl.get(endpoint); - this.compressionEnabled = compressionEnabled; + this.compressor = compressor; this.mediaType = MediaType.parse(contentType); this.headerSupplier = headerSupplier; } @@ -95,9 +95,9 @@ public void send( Request.Builder requestBuilder = new Request.Builder().url(url); headerSupplier.get().forEach(requestBuilder::addHeader); RequestBody body = new RawRequestBody(marshaler, contentLength, mediaType); - if (compressionEnabled) { - requestBuilder.addHeader("Content-Encoding", "gzip"); - requestBuilder.post(new GzipRequestBody(body)); + if (compressor != null) { + requestBuilder.addHeader("Content-Encoding", compressor.getEncoding()); + requestBuilder.post(new CompressedRequestBody(compressor, body)); } else { requestBuilder.post(body); } @@ -179,10 +179,12 @@ public void writeTo(BufferedSink bufferedSink) { } } - private static class GzipRequestBody extends RequestBody { + private static class CompressedRequestBody extends RequestBody { + private final Compressor compressor; private final RequestBody requestBody; - private GzipRequestBody(RequestBody requestBody) { + private CompressedRequestBody(Compressor compressor, RequestBody requestBody) { + this.compressor = compressor; this.requestBody = requestBody; } @@ -198,9 +200,10 @@ public long contentLength() { @Override public void writeTo(BufferedSink bufferedSink) throws IOException { - BufferedSink gzipSink = Okio.buffer(new GzipSink(bufferedSink)); - requestBody.writeTo(gzipSink); - gzipSink.close(); + BufferedSink compressedSink = + Okio.buffer(Okio.sink(compressor.compress(bufferedSink.outputStream()))); + requestBody.writeTo(compressedSink); + compressedSink.close(); } } } diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java index edf5e9cf45e..ec95a81dd95 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java @@ -6,6 +6,7 @@ package io.opentelemetry.exporter.sender.okhttp.internal; import io.opentelemetry.exporter.internal.auth.Authenticator; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.http.HttpSender; import io.opentelemetry.exporter.internal.http.HttpSenderProvider; import io.opentelemetry.sdk.common.export.RetryPolicy; @@ -26,7 +27,7 @@ public final class OkHttpHttpSenderProvider implements HttpSenderProvider { @Override public HttpSender createSender( String endpoint, - boolean compressionEnabled, + @Nullable Compressor compressor, String contentType, long timeoutNanos, Supplier> headerSupplier, @@ -36,7 +37,7 @@ public HttpSender createSender( @Nullable X509TrustManager trustManager) { return new OkHttpHttpSender( endpoint, - compressionEnabled, + compressor, contentType, timeoutNanos, headerSupplier, diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/HttpExporterBuilderTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/HttpExporterBuilderTest.java deleted file mode 100644 index b7b80453be0..00000000000 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/HttpExporterBuilderTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.exporter.sender.okhttp.internal; - -import static org.assertj.core.api.Assertions.assertThat; - -import io.opentelemetry.exporter.internal.http.HttpExporter; -import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; -import io.opentelemetry.exporter.internal.marshal.Marshaler; -import org.junit.jupiter.api.Test; - -class HttpExporterBuilderTest { - - private final HttpExporterBuilder builder = - new HttpExporterBuilder<>("otlp", "span", "http://localhost:4318/v1/traces"); - - @Test - void compressionDefault() { - HttpExporter exporter = builder.build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - HttpExporter.class, - otlp -> - assertThat(otlp) - .extracting("httpSender") - .isInstanceOf(OkHttpHttpSender.class) - .extracting("compressionEnabled") - .isEqualTo(false)); - } finally { - exporter.shutdown(); - } - } - - @Test - void compressionNone() { - HttpExporter exporter = builder.setCompression("none").build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - HttpExporter.class, - otlp -> - assertThat(otlp) - .extracting("httpSender") - .isInstanceOf(OkHttpHttpSender.class) - .extracting("compressionEnabled") - .isEqualTo(false)); - } finally { - exporter.shutdown(); - } - } - - @Test - void compressionGzip() { - HttpExporter exporter = builder.setCompression("gzip").build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - HttpExporter.class, - otlp -> - assertThat(otlp) - .extracting("httpSender") - .isInstanceOf(OkHttpHttpSender.class) - .extracting("compressionEnabled") - .isEqualTo(true)); - } finally { - exporter.shutdown(); - } - } - - @Test - void compressionEnabledAndDisabled() { - HttpExporter exporter = - builder.setCompression("gzip").setCompression("none").build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - HttpExporter.class, - otlp -> - assertThat(otlp) - .extracting("httpSender") - .isInstanceOf(OkHttpHttpSender.class) - .extracting("compressionEnabled") - .isEqualTo(false)); - } finally { - exporter.shutdown(); - } - } -} diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java index 22c2c4a0dd0..6297aadb4a8 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java @@ -34,6 +34,6 @@ void send(OkHttpHttpSender sender, Runnable onSuccess, Runnable onFailure) { @Override OkHttpHttpSender createSender(String endpoint) { return new OkHttpHttpSender( - endpoint, false, "text/plain", 10L, Collections::emptyMap, null, null, null, null); + endpoint, null, "text/plain", 10L, Collections::emptyMap, null, null, null, null); } } From 97f78612ddbdf9bc638d34a61ae3964e6fe7ecfb Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Tue, 2 Jan 2024 12:53:09 -0600 Subject: [PATCH 2/2] Use base64 for test compressor instead of lz4 --- .../otlp/testing-internal/build.gradle.kts | 1 - .../AbstractHttpTelemetryExporterTest.java | 21 +++++------ .../internal/compressor/Base64Compressor.java | 35 +++++++++++++++++++ .../Base64CompressorProvider.java} | 6 ++-- .../testing/internal/lz4/Lz4Compressor.java | 32 ----------------- ...er.internal.compression.CompressorProvider | 2 +- 6 files changed, 50 insertions(+), 47 deletions(-) create mode 100644 exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/compressor/Base64Compressor.java rename exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/{lz4/Lz4CompressorProvider.java => compressor/Base64CompressorProvider.java} (60%) delete mode 100644 exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/lz4/Lz4Compressor.java diff --git a/exporters/otlp/testing-internal/build.gradle.kts b/exporters/otlp/testing-internal/build.gradle.kts index ee559c015eb..2e491e84c13 100644 --- a/exporters/otlp/testing-internal/build.gradle.kts +++ b/exporters/otlp/testing-internal/build.gradle.kts @@ -31,7 +31,6 @@ dependencies { implementation("com.linecorp.armeria:armeria-junit5") implementation("io.github.netmikey.logunit:logunit-jul") implementation("org.assertj:assertj-core") - implementation("org.lz4:lz4-java:1.8.0") } // Skip OWASP dependencyCheck task on test module diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java index 99b41ca4cca..3b903ba0c32 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java @@ -30,7 +30,7 @@ import io.opentelemetry.exporter.internal.compression.GzipCompressor; import io.opentelemetry.exporter.internal.http.HttpExporter; import io.opentelemetry.exporter.internal.marshal.Marshaler; -import io.opentelemetry.exporter.otlp.testing.internal.lz4.Lz4Compressor; +import io.opentelemetry.exporter.otlp.testing.internal.compressor.Base64Compressor; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; @@ -51,6 +51,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -65,7 +66,6 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.X509KeyManager; import javax.net.ssl.X509TrustManager; -import net.jpountz.lz4.LZ4FrameInputStream; import okio.Buffer; import okio.GzipSource; import okio.Okio; @@ -192,10 +192,11 @@ private static byte[] maybeInflate(RequestHeaders requestHeaders, byte[] content gzipSource.read(buffer, Integer.MAX_VALUE); return buffer.readByteArray(); } - if (requestHeaders.contains("content-encoding", "lz4")) { + if (requestHeaders.contains("content-encoding", "base64")) { Buffer buffer = new Buffer(); - Source lz4Source = Okio.source(new LZ4FrameInputStream(new ByteArrayInputStream(content))); - lz4Source.read(buffer, Integer.MAX_VALUE); + Source base64Source = + Okio.source(Base64.getDecoder().wrap(new ByteArrayInputStream(content))); + base64Source.read(buffer, Integer.MAX_VALUE); return buffer.readByteArray(); } return content; @@ -319,17 +320,17 @@ void compressionWithGzip() { @Test void compressionWithSpiCompressor() { TelemetryExporter exporter = - exporterBuilder().setEndpoint(server.httpUri() + path).setCompression("lz4").build(); + exporterBuilder().setEndpoint(server.httpUri() + path).setCompression("base64").build(); assertThat(exporter.unwrap()) .extracting("delegate.httpSender.compressor") - .isEqualTo(Lz4Compressor.getInstance()); + .isEqualTo(Base64Compressor.getInstance()); try { CompletableResultCode result = exporter.export(Collections.singletonList(generateFakeTelemetry())); assertThat(result.join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); assertThat(httpRequests) .singleElement() - .satisfies(req -> assertThat(req.headers().get("content-encoding")).isEqualTo("lz4")); + .satisfies(req -> assertThat(req.headers().get("content-encoding")).isEqualTo("base64")); } finally { exporter.shutdown(); } @@ -686,7 +687,7 @@ void validConfig() { assertThatCode(() -> exporterBuilder().setCompression("gzip")).doesNotThrowAnyException(); // SPI compressor available for this test but not packaged with OTLP exporter - assertThatCode(() -> exporterBuilder().setCompression("lz4")).doesNotThrowAnyException(); + assertThatCode(() -> exporterBuilder().setCompression("base64")).doesNotThrowAnyException(); assertThatCode(() -> exporterBuilder().setCompression("none")).doesNotThrowAnyException(); assertThatCode(() -> exporterBuilder().addHeader("foo", "bar").addHeader("baz", "qux")) @@ -739,7 +740,7 @@ void invalidConfig() { assertThatThrownBy(() -> exporterBuilder().setCompression("foo")) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Unsupported compressionMethod. Compression method must be \"none\" or one of: [lz4,gzip]"); + "Unsupported compressionMethod. Compression method must be \"none\" or one of: [base64,gzip]"); } @Test diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/compressor/Base64Compressor.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/compressor/Base64Compressor.java new file mode 100644 index 00000000000..b0b3121d635 --- /dev/null +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/compressor/Base64Compressor.java @@ -0,0 +1,35 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.testing.internal.compressor; + +import io.opentelemetry.exporter.internal.compression.Compressor; +import java.io.OutputStream; +import java.util.Base64; + +/** + * This exists to test the compressor SPI mechanism but does not actually compress data in any + * useful way. + */ +public class Base64Compressor implements Compressor { + + private static final Base64Compressor INSTANCE = new Base64Compressor(); + + private Base64Compressor() {} + + public static Base64Compressor getInstance() { + return INSTANCE; + } + + @Override + public String getEncoding() { + return "base64"; + } + + @Override + public OutputStream compress(OutputStream outputStream) { + return Base64.getEncoder().wrap(outputStream); + } +} diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/lz4/Lz4CompressorProvider.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/compressor/Base64CompressorProvider.java similarity index 60% rename from exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/lz4/Lz4CompressorProvider.java rename to exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/compressor/Base64CompressorProvider.java index 818f5ddf92c..8d4b4a6cdbc 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/lz4/Lz4CompressorProvider.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/compressor/Base64CompressorProvider.java @@ -3,15 +3,15 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.exporter.otlp.testing.internal.lz4; +package io.opentelemetry.exporter.otlp.testing.internal.compressor; import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.compression.CompressorProvider; -public class Lz4CompressorProvider implements CompressorProvider { +public class Base64CompressorProvider implements CompressorProvider { @Override public Compressor getInstance() { - return Lz4Compressor.getInstance(); + return Base64Compressor.getInstance(); } } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/lz4/Lz4Compressor.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/lz4/Lz4Compressor.java deleted file mode 100644 index 895a5ff9962..00000000000 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/lz4/Lz4Compressor.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.exporter.otlp.testing.internal.lz4; - -import io.opentelemetry.exporter.internal.compression.Compressor; -import java.io.IOException; -import java.io.OutputStream; -import net.jpountz.lz4.LZ4FrameOutputStream; - -public class Lz4Compressor implements Compressor { - - private static final Lz4Compressor INSTANCE = new Lz4Compressor(); - - private Lz4Compressor() {} - - public static final Lz4Compressor getInstance() { - return INSTANCE; - } - - @Override - public String getEncoding() { - return "lz4"; - } - - @Override - public OutputStream compress(OutputStream outputStream) throws IOException { - return new LZ4FrameOutputStream(outputStream); - } -} diff --git a/exporters/otlp/testing-internal/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.compression.CompressorProvider b/exporters/otlp/testing-internal/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.compression.CompressorProvider index 732ecf8ac84..6fac487c249 100644 --- a/exporters/otlp/testing-internal/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.compression.CompressorProvider +++ b/exporters/otlp/testing-internal/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.compression.CompressorProvider @@ -1 +1 @@ -io.opentelemetry.exporter.otlp.testing.internal.lz4.Lz4CompressorProvider +io.opentelemetry.exporter.otlp.testing.internal.compressor.Base64CompressorProvider