Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -269,6 +270,7 @@ class DefaultClientTelemetrySender implements ClientTelemetrySender {
private static final double INITIAL_PUSH_JITTER_LOWER = 0.5;
private static final double INITIAL_PUSH_JITTER_UPPER = 1.5;

private final Set<CompressionType> unsupportedCompressionTypes = ConcurrentHashMap.newKeySet();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Condition subscriptionLoaded = lock.writeLock().newCondition();
/*
Expand Down Expand Up @@ -713,12 +715,26 @@ private Optional<Builder<?>> createPushRequest(ClientTelemetrySubscription local
return Optional.empty();
}

CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(), unsupportedCompressionTypes);
ByteBuffer compressedPayload;
try {
compressedPayload = ClientTelemetryUtils.compress(payload, compressionType);
} catch (Throwable e) {
log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType);
// Distinguish between recoverable errors (NoClassDefFoundError for missing compression libs)
// and fatal errors (OutOfMemoryError, etc.) that should terminate telemetry.
if (e instanceof Error && !(e instanceof NoClassDefFoundError) && !(e.getCause() instanceof NoClassDefFoundError)) {
lock.writeLock().lock();
try {
state = ClientTelemetryState.TERMINATED;
} finally {
lock.writeLock().unlock();
}
log.error("Unexpected error occurred while compressing telemetry payload for compression: {}, stopping client telemetry", compressionType, e);
throw new KafkaException("Unexpected compression error", e);
}

log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType, e);
unsupportedCompressionTypes.add(compressionType);
compressedPayload = ByteBuffer.wrap(payload.toByteArray());
compressionType = CompressionType.NONE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;

import io.opentelemetry.proto.metrics.v1.MetricsData;
Expand Down Expand Up @@ -181,13 +182,23 @@ public static boolean validateRequiredResourceLabels(Map<String, String> metadat
return validateResourceLabel(metadata, MetricsContext.NAMESPACE);
}

public static CompressionType preferredCompressionType(List<CompressionType> acceptedCompressionTypes) {
if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) {
// Broker is providing the compression types in order of preference. Grab the
// first one.
return acceptedCompressionTypes.get(0);
}
return CompressionType.NONE;
/**
* Determines the preferred compression type from broker-accepted types, avoiding unsupported ones.
*
* @param acceptedCompressionTypes the list of compression types accepted by the broker in order
* of preference (must not be null, use empty list if no compression is accepted)
* @param unsupportedCompressionTypes the set of compression types that should be avoided due to
* missing libraries or previous failures (must not be null)
* @return the preferred compression type to use, or {@link CompressionType#NONE} if no acceptable
* compression type is available
*/
public static CompressionType preferredCompressionType(List<CompressionType> acceptedCompressionTypes, Set<CompressionType> unsupportedCompressionTypes) {
// Broker is providing the compression types in order of preference. Grab the
// first one that's supported.
return acceptedCompressionTypes.stream()
.filter(t -> !unsupportedCompressionTypes.contains(t))
.findFirst()
.orElse(CompressionType.NONE);
}

public static ByteBuffer compress(MetricsData metrics, CompressionType compressionType) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
Expand Down Expand Up @@ -63,8 +64,10 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;

public class ClientTelemetryReporterTest {

Expand Down Expand Up @@ -413,6 +416,134 @@ public void testCreateRequestPushCompressionException() {
}
}

@Test
public void testCreateRequestPushCompressionFallbackToNextType() {
clientTelemetryReporter.configure(configs);
clientTelemetryReporter.contextChange(metricsContext);

ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));

// Set up subscription with multiple compression types: GZIP -> LZ4 -> SNAPPY
ClientTelemetryReporter.ClientTelemetrySubscription subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(
uuid, 1234, 20000, List.of(CompressionType.GZIP, CompressionType.LZ4, CompressionType.SNAPPY), true, null);
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());

try (MockedStatic<ClientTelemetryUtils> mockedCompress = Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) {
// First request: GZIP fails with NoClassDefFoundError, should use NONE for this request
mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.GZIP))).thenThrow(new NoClassDefFoundError("GZIP not available"));

Optional<AbstractRequest.Builder<?>> requestOptional = telemetrySender.createRequest();
assertNotNull(requestOptional);
assertTrue(requestOptional.isPresent());
assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build());
PushTelemetryRequest request = (PushTelemetryRequest) requestOptional.get().build();

// Should fallback to NONE for this request (GZIP gets cached as unsupported)
assertEquals(CompressionType.NONE.id, request.data().compressionType());
assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state());

// Reset state for next request
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));

// Second request: LZ4 is selected (since GZIP is now cached as unsupported), LZ4 fails, should use NONE
// Note that some libraries eg. LZ4 return KafkaException with cause as NoClassDefFoundError
mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.LZ4))).thenThrow(new KafkaException(new NoClassDefFoundError("LZ4 not available")));

requestOptional = telemetrySender.createRequest();
assertNotNull(requestOptional);
assertTrue(requestOptional.isPresent());
assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build());
request = (PushTelemetryRequest) requestOptional.get().build();

// Should fallback to NONE for this request (LZ4 gets cached as unsupported)
assertEquals(CompressionType.NONE.id, request.data().compressionType());
assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state());

// Reset state for next request
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));

// Third request: SNAPPY is selected (since GZIP and LZ4 are now cached as unsupported), SNAPPY fails, should use NONE
mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.SNAPPY))).thenThrow(new NoClassDefFoundError("SNAPPY not available"));

requestOptional = telemetrySender.createRequest();
assertNotNull(requestOptional);
assertTrue(requestOptional.isPresent());
assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build());
request = (PushTelemetryRequest) requestOptional.get().build();

// Should fallback to NONE for this request (SNAPPY gets cached as unsupported)
assertEquals(CompressionType.NONE.id, request.data().compressionType());
assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state());

// Reset state for next request
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));

// Fourth request: All compression types are now cached as unsupported, should use NONE directly
requestOptional = telemetrySender.createRequest();
assertNotNull(requestOptional);
assertTrue(requestOptional.isPresent());
assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build());
request = (PushTelemetryRequest) requestOptional.get().build();

// Should use NONE directly (no compression types are supported)
assertEquals(CompressionType.NONE.id, request.data().compressionType());
assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state());
}
}

@Test
public void testCreateRequestPushCompressionFallbackAndTermination() {
clientTelemetryReporter.configure(configs);
clientTelemetryReporter.contextChange(metricsContext);

ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));

// Set up subscription with ZSTD compression type
ClientTelemetryReporter.ClientTelemetrySubscription subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(
uuid, 1234, 20000, List.of(CompressionType.ZSTD, CompressionType.LZ4), true, null);
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());

try (MockedStatic<ClientTelemetryUtils> mockedCompress = Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) {

// === Test 1: NoClassDefFoundError fallback (recoverable) ===
mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.ZSTD)))
.thenThrow(new NoClassDefFoundError("com/github/luben/zstd/BufferPool"));

assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state());

Optional<AbstractRequest.Builder<?>> request1 = telemetrySender.createRequest();
assertNotNull(request1);
assertTrue(request1.isPresent());
assertInstanceOf(PushTelemetryRequest.class, request1.get().build());
PushTelemetryRequest pushRequest1 = (PushTelemetryRequest) request1.get().build();
assertEquals(CompressionType.NONE.id, pushRequest1.data().compressionType()); // Fallback to NONE
assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state());

// Reset state (simulate successful response handling)
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));

// === Test 2: OutOfMemoryError causes termination (non-recoverable Error) ===
mockedCompress.reset();
mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.LZ4)))
.thenThrow(new OutOfMemoryError("Out of memory during compression"));

assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state());

assertThrows(KafkaException.class, () -> telemetrySender.createRequest());
assertEquals(ClientTelemetryState.TERMINATED, telemetrySender.state());

// === Test 3: After termination, no more requests ===
Optional<AbstractRequest.Builder<?>> request3 = telemetrySender.createRequest();
assertNotNull(request3);
assertFalse(request3.isPresent()); // No request created
assertEquals(ClientTelemetryState.TERMINATED, telemetrySender.state()); // State remains TERMINATED
}
}

@Test
public void testHandleResponseGetSubscriptions() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
Expand Down
Loading