diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java index e0491943fefa7..bef65977be4c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java @@ -50,6 +50,7 @@ import java.util.Optional; import java.util.Set; import java.util.StringJoiner; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -269,6 +270,7 @@ class DefaultClientTelemetrySender implements ClientTelemetrySender { private static final double INITIAL_PUSH_JITTER_LOWER = 0.5; private static final double INITIAL_PUSH_JITTER_UPPER = 1.5; + private final Set unsupportedCompressionTypes = ConcurrentHashMap.newKeySet(); private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Condition subscriptionLoaded = lock.writeLock().newCondition(); /* @@ -713,12 +715,26 @@ private Optional> createPushRequest(ClientTelemetrySubscription local return Optional.empty(); } - CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); + CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(), unsupportedCompressionTypes); ByteBuffer compressedPayload; try { compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); } catch (Throwable e) { - log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); + // Distinguish between recoverable errors (NoClassDefFoundError for missing compression libs) + // and fatal errors (OutOfMemoryError, etc.) that should terminate telemetry. + if (e instanceof Error && !(e instanceof NoClassDefFoundError) && !(e.getCause() instanceof NoClassDefFoundError)) { + lock.writeLock().lock(); + try { + state = ClientTelemetryState.TERMINATED; + } finally { + lock.writeLock().unlock(); + } + log.error("Unexpected error occurred while compressing telemetry payload for compression: {}, stopping client telemetry", compressionType, e); + throw new KafkaException("Unexpected compression error", e); + } + + log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType, e); + unsupportedCompressionTypes.add(compressionType); compressedPayload = ByteBuffer.wrap(payload.toByteArray()); compressionType = CompressionType.NONE; } diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java index 3c555afb3b05d..111b041946c6a 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Predicate; import io.opentelemetry.proto.metrics.v1.MetricsData; @@ -181,13 +182,23 @@ public static boolean validateRequiredResourceLabels(Map metadat return validateResourceLabel(metadata, MetricsContext.NAMESPACE); } - public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { - if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { - // Broker is providing the compression types in order of preference. Grab the - // first one. - return acceptedCompressionTypes.get(0); - } - return CompressionType.NONE; + /** + * Determines the preferred compression type from broker-accepted types, avoiding unsupported ones. + * + * @param acceptedCompressionTypes the list of compression types accepted by the broker in order + * of preference (must not be null, use empty list if no compression is accepted) + * @param unsupportedCompressionTypes the set of compression types that should be avoided due to + * missing libraries or previous failures (must not be null) + * @return the preferred compression type to use, or {@link CompressionType#NONE} if no acceptable + * compression type is available + */ + public static CompressionType preferredCompressionType(List acceptedCompressionTypes, Set unsupportedCompressionTypes) { + // Broker is providing the compression types in order of preference. Grab the + // first one that's supported. + return acceptedCompressionTypes.stream() + .filter(t -> !unsupportedCompressionTypes.contains(t)) + .findFirst() + .orElse(CompressionType.NONE); } public static ByteBuffer compress(MetricsData metrics, CompressionType compressionType) throws IOException { diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java index b708b4eeb602d..c06e853b0733f 100644 --- a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java +++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; @@ -63,8 +64,10 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; public class ClientTelemetryReporterTest { @@ -413,6 +416,134 @@ public void testCreateRequestPushCompressionException() { } } + @Test + public void testCreateRequestPushCompressionFallbackToNextType() { + clientTelemetryReporter.configure(configs); + clientTelemetryReporter.contextChange(metricsContext); + + ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); + + // Set up subscription with multiple compression types: GZIP -> LZ4 -> SNAPPY + ClientTelemetryReporter.ClientTelemetrySubscription subscription = new ClientTelemetryReporter.ClientTelemetrySubscription( + uuid, 1234, 20000, List.of(CompressionType.GZIP, CompressionType.LZ4, CompressionType.SNAPPY), true, null); + telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); + + try (MockedStatic mockedCompress = Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) { + // First request: GZIP fails with NoClassDefFoundError, should use NONE for this request + mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.GZIP))).thenThrow(new NoClassDefFoundError("GZIP not available")); + + Optional> requestOptional = telemetrySender.createRequest(); + assertNotNull(requestOptional); + assertTrue(requestOptional.isPresent()); + assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build()); + PushTelemetryRequest request = (PushTelemetryRequest) requestOptional.get().build(); + + // Should fallback to NONE for this request (GZIP gets cached as unsupported) + assertEquals(CompressionType.NONE.id, request.data().compressionType()); + assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state()); + + // Reset state for next request + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); + + // Second request: LZ4 is selected (since GZIP is now cached as unsupported), LZ4 fails, should use NONE + // Note that some libraries eg. LZ4 return KafkaException with cause as NoClassDefFoundError + mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.LZ4))).thenThrow(new KafkaException(new NoClassDefFoundError("LZ4 not available"))); + + requestOptional = telemetrySender.createRequest(); + assertNotNull(requestOptional); + assertTrue(requestOptional.isPresent()); + assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build()); + request = (PushTelemetryRequest) requestOptional.get().build(); + + // Should fallback to NONE for this request (LZ4 gets cached as unsupported) + assertEquals(CompressionType.NONE.id, request.data().compressionType()); + assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state()); + + // Reset state for next request + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); + + // Third request: SNAPPY is selected (since GZIP and LZ4 are now cached as unsupported), SNAPPY fails, should use NONE + mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.SNAPPY))).thenThrow(new NoClassDefFoundError("SNAPPY not available")); + + requestOptional = telemetrySender.createRequest(); + assertNotNull(requestOptional); + assertTrue(requestOptional.isPresent()); + assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build()); + request = (PushTelemetryRequest) requestOptional.get().build(); + + // Should fallback to NONE for this request (SNAPPY gets cached as unsupported) + assertEquals(CompressionType.NONE.id, request.data().compressionType()); + assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state()); + + // Reset state for next request + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); + + // Fourth request: All compression types are now cached as unsupported, should use NONE directly + requestOptional = telemetrySender.createRequest(); + assertNotNull(requestOptional); + assertTrue(requestOptional.isPresent()); + assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build()); + request = (PushTelemetryRequest) requestOptional.get().build(); + + // Should use NONE directly (no compression types are supported) + assertEquals(CompressionType.NONE.id, request.data().compressionType()); + assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state()); + } + } + + @Test + public void testCreateRequestPushCompressionFallbackAndTermination() { + clientTelemetryReporter.configure(configs); + clientTelemetryReporter.contextChange(metricsContext); + + ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); + + // Set up subscription with ZSTD compression type + ClientTelemetryReporter.ClientTelemetrySubscription subscription = new ClientTelemetryReporter.ClientTelemetrySubscription( + uuid, 1234, 20000, List.of(CompressionType.ZSTD, CompressionType.LZ4), true, null); + telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); + + try (MockedStatic mockedCompress = Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) { + + // === Test 1: NoClassDefFoundError fallback (recoverable) === + mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.ZSTD))) + .thenThrow(new NoClassDefFoundError("com/github/luben/zstd/BufferPool")); + + assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state()); + + Optional> request1 = telemetrySender.createRequest(); + assertNotNull(request1); + assertTrue(request1.isPresent()); + assertInstanceOf(PushTelemetryRequest.class, request1.get().build()); + PushTelemetryRequest pushRequest1 = (PushTelemetryRequest) request1.get().build(); + assertEquals(CompressionType.NONE.id, pushRequest1.data().compressionType()); // Fallback to NONE + assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state()); + + // Reset state (simulate successful response handling) + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); + + // === Test 2: OutOfMemoryError causes termination (non-recoverable Error) === + mockedCompress.reset(); + mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.LZ4))) + .thenThrow(new OutOfMemoryError("Out of memory during compression")); + + assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state()); + + assertThrows(KafkaException.class, () -> telemetrySender.createRequest()); + assertEquals(ClientTelemetryState.TERMINATED, telemetrySender.state()); + + // === Test 3: After termination, no more requests === + Optional> request3 = telemetrySender.createRequest(); + assertNotNull(request3); + assertFalse(request3.isPresent()); // No request created + assertEquals(ClientTelemetryState.TERMINATED, telemetrySender.state()); // State remains TERMINATED + } + } + @Test public void testHandleResponseGetSubscriptions() { ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java index 41679bed3f7ac..47925ff8e0a02 100644 --- a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java @@ -30,10 +30,9 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.function.Predicate; import io.opentelemetry.proto.metrics.v1.Metric; @@ -69,12 +68,12 @@ public void testMaybeFetchErrorIntervalMs() { @Test public void testGetSelectorFromRequestedMetrics() { // no metrics selector - assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(Collections.emptyList())); + assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(List.of())); assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(null)); // all metrics selector - assertEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(Collections.singletonList("*"))); + assertEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(List.of("*"))); // specific metrics selector - Predicate selector = ClientTelemetryUtils.getSelectorFromRequestedMetrics(Arrays.asList("metric1", "metric2")); + Predicate selector = ClientTelemetryUtils.getSelectorFromRequestedMetrics(List.of("metric1", "metric2")); assertNotEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, selector); assertNotEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, selector); assertTrue(selector.test(new MetricKey("metric1.test"))); @@ -86,7 +85,7 @@ public void testGetSelectorFromRequestedMetrics() { @Test public void testGetCompressionTypesFromAcceptedList() { assertEquals(0, ClientTelemetryUtils.getCompressionTypesFromAcceptedList(null).size()); - assertEquals(0, ClientTelemetryUtils.getCompressionTypesFromAcceptedList(Collections.emptyList()).size()); + assertEquals(0, ClientTelemetryUtils.getCompressionTypesFromAcceptedList(List.of()).size()); List compressionTypes = new ArrayList<>(); compressionTypes.add(CompressionType.GZIP.id); @@ -123,10 +122,24 @@ public void testValidateIntervalMsInvalid(int pushIntervalMs) { @Test public void testPreferredCompressionType() { - assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Collections.emptyList())); - assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(null)); - assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.NONE, CompressionType.GZIP))); - assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.GZIP, CompressionType.NONE))); + // Test with no unsupported types + assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(List.of(), Set.of())); + assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.NONE, CompressionType.GZIP), Set.of())); + assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.NONE), Set.of())); + + // Test unsupported type filtering (returns first available type, or NONE if all are unsupported) + assertEquals(CompressionType.LZ4, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.LZ4), Set.of(CompressionType.GZIP))); + assertEquals(CompressionType.SNAPPY, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.LZ4, CompressionType.SNAPPY), Set.of(CompressionType.GZIP, CompressionType.LZ4))); + assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.LZ4), Set.of(CompressionType.GZIP, CompressionType.LZ4))); + + // Test edge case: no match between requested and supported types + assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.LZ4), Set.of(CompressionType.SNAPPY))); + + // Test NullPointerException for null parameters + assertThrows(NullPointerException.class, () -> + ClientTelemetryUtils.preferredCompressionType(null, Set.of())); + assertThrows(NullPointerException.class, () -> + ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.NONE), null)); } @ParameterizedTest @@ -150,19 +163,19 @@ public void testCompressDecompress(CompressionType compressionType) throws IOExc private MetricsData getMetricsData() { List metricsList = new ArrayList<>(); metricsList.add(SinglePointMetric.sum( - new MetricKey("metricName"), 1.0, true, Instant.now(), null, Collections.emptySet()) + new MetricKey("metricName"), 1.0, true, Instant.now(), null, Set.of()) .builder().build()); metricsList.add(SinglePointMetric.sum( - new MetricKey("metricName1"), 100.0, false, Instant.now(), Instant.now(), Collections.emptySet()) + new MetricKey("metricName1"), 100.0, false, Instant.now(), Instant.now(), Set.of()) .builder().build()); metricsList.add(SinglePointMetric.deltaSum( - new MetricKey("metricName2"), 1.0, true, Instant.now(), Instant.now(), Collections.emptySet()) + new MetricKey("metricName2"), 1.0, true, Instant.now(), Instant.now(), Set.of()) .builder().build()); metricsList.add(SinglePointMetric.gauge( - new MetricKey("metricName3"), 1.0, Instant.now(), Collections.emptySet()) + new MetricKey("metricName3"), 1.0, Instant.now(), Set.of()) .builder().build()); metricsList.add(SinglePointMetric.gauge( - new MetricKey("metricName4"), Long.valueOf(100), Instant.now(), Collections.emptySet()) + new MetricKey("metricName4"), Long.valueOf(100), Instant.now(), Set.of()) .builder().build()); MetricsData.Builder builder = MetricsData.newBuilder();