Skip to content

Conversation

@k-raina
Copy link
Contributor

@k-raina k-raina commented Jul 10, 2025

Summary

This PR implements dynamic compression type selection and fallback
mechanism for client telemetry to handle cases where compression
libraries are not available on the client classpath.

Problem

Currently, when a compression library is missing (e.g.,
NoClassDefFoundError), the client telemetry system catches the generic
Throwable but doesn't learn from the failure. This means, the same
unsupported compression type will be attempted on every telemetry push

Solution

This PR introduces a comprehensive fallback mechanism:

  • Specific Exception Handling: Replace generic Throwable catching with
    specific exceptions (IOException, NoClassDefFoundError)
  • Unsupported Compression Tracking: Add unsupportedCompressionTypes
    collection to track compression types that have failed due to missing
    libraries
  • Dynamic Selection: Enhance
    ClientTelemetryUtils.preferredCompressionType() to accept an unsupported
    types parameter and filter out known problematic compression types
  • Thread Safety: Use ConcurrentHashMap.newKeySet() for thread-safe
    access to the unsupported types collection
  • Improved Logging: Include exception details in log messages for better
    debugging

Key Changes

  • Modified createPushRequest() to track failed compression types in
    unsupportedCompressionTypes
  • Updated ClientTelemetryUtils.preferredCompressionType() to filter out
    unsupported types
  • Enhanced exception handling with specific exception types instead of
    Throwable

Testing

  • Added appropriate Unit tests
  • Testing apache kafka on local logs:
✗ cat ~/Desktop/kafka-client.log | grep "
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter"
2025-07-17 07:56:52:602 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry subscription request with client instance id
AAAAAAAAAAAAAAAAAAAAAA
2025-07-17 07:56:52:602 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from SUBSCRIPTION_NEEDED to
SUBSCRIPTION_IN_PROGRESS
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from SUBSCRIPTION_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Telemetry subscription push interval value from broker was 5000; to
stagger requests the first push interval is being adjusted to 4551
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Updating subscription - subscription:
ClientTelemetrySubscription{clientInstanceId=aVd3fzviRGSgEuAWNY5mMA,
subscriptionId=1650084878, pushIntervalMs=5000,
acceptedCompressionTypes=[zstd, lz4, snappy, none],
deltaTemporality=true,
selector=org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils$$Lambda$308/0x00000005011ce470@2f16e398};
intervalMs: 4551, lastRequestMs: 1752739012639
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] INFO
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Client telemetry registered with client instance id:
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:56:57:196 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:56:57:196 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:56:57:224 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Compression library zstd not found, sending uncompressed data
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389)
2025-07-17 07:56:57:295 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:02:296 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:02:297 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:02:300 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Compression library lz4 not found, sending uncompressed data
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389)
2025-07-17 07:57:02:329 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:07:329 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:07:330 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:07:331 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Compression library snappy not found, sending uncompressed data
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389)
2025-07-17 07:57:07:344 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:12:346 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:12:346 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:12:400 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:17:402 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:17:402 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:17:442 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:22:442 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:22:442 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:22:508 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:27:512 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:27:512 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:27:555 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:32:555 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:32:555 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:32:578 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:37:580 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:37:580 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:37:606 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:42:606 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:42:606 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:42:646 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:47:647 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:47:647 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:47:673 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:52:673 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:52:673 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:52:711 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:57:711 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:57:711 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:57:765 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED

Reviewers: poorv Mittal apoorvmittal10@gmail.com, Chia-Ping Tsai
chia7712@gmail.com

@github-actions github-actions bot added triage PRs from the community core Kafka Broker clients small Small PRs labels Jul 10, 2025
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@k-raina thanks for this patch. I have a suggestion for you.

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the the patch. Can we pelase add test cases for the PR as well.

@github-actions github-actions bot removed the triage PRs from the community label Jul 11, 2025
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@k-raina thanks for updates. I leave a suggestion based on comment #20144 (comment)

@k-raina k-raina changed the title Minor : Use specific error in compression catch handle KAFKA-19506 : Use specific error in compression catch handle Jul 15, 2025
@k-raina k-raina changed the title KAFKA-19506 : Use specific error in compression catch handle KAFKA-19506 : Implement dynamic compression type selection and fallback for client telemetry Jul 16, 2025
@github-actions github-actions bot removed the small Small PRs label Jul 17, 2025
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, some comments.

@k-raina k-raina requested a review from chia7712 July 20, 2025 22:19
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, some comments.

@k-raina k-raina requested a review from apoorvmittal10 August 5, 2025 13:50
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments.

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, some comments in tests.

assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.NONE, CompressionType.GZIP)));
assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.GZIP, CompressionType.NONE)));
// Test with no unsupported types
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Collections.emptyList(), Collections.emptySet()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same elsewhere:

Suggested change
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Collections.emptyList(), Collections.emptySet()));
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(List.of(), Set.of()));

Comment on lines 155 to 156
assertThrows(NullPointerException.class, () ->
ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.LZ4, CompressionType.SNAPPY), null));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not adding any value, already tested in previous line.

Comment on lines 141 to 144
// Test priority order with unsupported types (first available wins)
assertEquals(CompressionType.ZSTD, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.ZSTD, CompressionType.SNAPPY), Set.of()));
assertEquals(CompressionType.SNAPPY, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.ZSTD, CompressionType.SNAPPY), Set.of(CompressionType.ZSTD)));
assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.LZ4, CompressionType.GZIP, CompressionType.SNAPPY), Set.of(CompressionType.LZ4)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How these tests are different than above // Test with unsupported types filtering?

assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.NONE, CompressionType.GZIP), Collections.emptySet()));
assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.GZIP, CompressionType.NONE), Collections.emptySet()));

// Test with unsupported types filtering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test when the list has no common match i.e. ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.LZ4), Set.of(CompressionType.SNAPPY)

@apoorvmittal10
Copy link
Contributor

Build is failing though.

@k-raina k-raina force-pushed the use-specific-error-compression-catch branch from e0a2a15 to abbb5a9 Compare August 21, 2025 18:44
@k-raina
Copy link
Contributor Author

k-raina commented Aug 21, 2025

@apoorvmittal10 Rebased branch. Build should pass now

@k-raina k-raina requested a review from apoorvmittal10 August 21, 2025 18:45
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment. Please avoid force push, it makes further reviews difficult as previous comments in files are not shown.

assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.GZIP, CompressionType.NONE)));
// Test with no unsupported types
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(List.of(), Set.of()));
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.NONE, CompressionType.GZIP), Set.of()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use List.of instead of Arrays.asList, same else where. I thought comment here will help you understand that other places can be corrected as well.

Copy link
Contributor Author

@k-raina k-raina Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @apoorvmittal10 for review
Previous comment was unclear, there were multiple instances of Collections.emptyList() ,Collections.emptyList in code unrelated to this PR. I have updated them in commit 60e68e8.

@k-raina k-raina requested a review from apoorvmittal10 August 22, 2025 07:26
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! @chia7712 I will wait for your approval as you previouslye reviewed the PR.

@chia7712 chia7712 merged commit b4bf0bf into apache:trunk Aug 23, 2025
25 checks passed
eduwercamacaro pushed a commit to littlehorse-enterprises/kafka that referenced this pull request Nov 12, 2025
… for client telemetry (apache#20144)

#### Summary
This PR implements dynamic compression type selection and fallback
mechanism for client telemetry to handle cases where compression
libraries are not available on the client classpath.

#### Problem
Currently, when a compression library is missing (e.g.,
NoClassDefFoundError), the client telemetry system catches the generic
Throwable but doesn't learn from the failure. This means, the same
unsupported compression type will be attempted on every telemetry push

#### Solution
This PR introduces a comprehensive fallback mechanism:
- Specific Exception Handling: Replace generic Throwable catching with
specific exceptions (IOException, NoClassDefFoundError)
- Unsupported Compression Tracking: Add unsupportedCompressionTypes
collection to track compression types that have failed due to missing
libraries
- Dynamic Selection: Enhance
ClientTelemetryUtils.preferredCompressionType() to accept an unsupported
types parameter and filter out known problematic compression types
- Thread Safety: Use ConcurrentHashMap.newKeySet() for thread-safe
access to the unsupported types collection
- Improved Logging: Include exception details in log messages for better
debugging

#### Key Changes
- Modified createPushRequest() to track failed compression types in
unsupportedCompressionTypes
- Updated ClientTelemetryUtils.preferredCompressionType() to filter out
unsupported types
- Enhanced exception handling with specific exception types instead of
Throwable

#### Testing
- Added appropriate Unit tests
- Testing apache kafka on local logs:
```
✗ cat ~/Desktop/kafka-client.log | grep "
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter"
2025-07-17 07:56:52:602 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry subscription request with client instance id
AAAAAAAAAAAAAAAAAAAAAA
2025-07-17 07:56:52:602 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from SUBSCRIPTION_NEEDED to
SUBSCRIPTION_IN_PROGRESS
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from SUBSCRIPTION_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Telemetry subscription push interval value from broker was 5000; to
stagger requests the first push interval is being adjusted to 4551
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Updating subscription - subscription:
ClientTelemetrySubscription{clientInstanceId=aVd3fzviRGSgEuAWNY5mMA,
subscriptionId=1650084878, pushIntervalMs=5000,
acceptedCompressionTypes=[zstd, lz4, snappy, none],
deltaTemporality=true,
selector=org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils$$Lambda$308/0x00000005011ce470@2f16e398};
intervalMs: 4551, lastRequestMs: 1752739012639
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] INFO
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Client telemetry registered with client instance id:
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:56:57:196 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:56:57:196 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:56:57:224 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Compression library zstd not found, sending uncompressed data
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389)
2025-07-17 07:56:57:295 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:02:296 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:02:297 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:02:300 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Compression library lz4 not found, sending uncompressed data
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389)
2025-07-17 07:57:02:329 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:07:329 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:07:330 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:07:331 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Compression library snappy not found, sending uncompressed data
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389)
2025-07-17 07:57:07:344 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:12:346 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:12:346 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:12:400 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:17:402 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:17:402 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:17:442 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:22:442 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:22:442 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:22:508 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:27:512 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:27:512 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:27:555 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:32:555 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:32:555 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:32:578 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:37:580 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:37:580 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:37:606 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:42:606 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:42:606 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:42:646 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:47:647 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:47:647 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:47:673 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:52:673 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:52:673 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:52:711 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:57:711 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:57:711 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:57:765 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
```

Reviewers: poorv Mittal <apoorvmittal10@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants