From 06062a44449b4e837ca914d3fcbc1ae1e39e6260 Mon Sep 17 00:00:00 2001
From: bcol-google <134437113+bcol-google@users.noreply.github.com>
Date: Wed, 2 Oct 2024 18:23:16 -0400
Subject: [PATCH] fix: Update kafka-clients to 3.8.0 past snappy CVEs and
implement dummy `clientInstanceId` (#482)
---
pom.xml | 2 +-
.../google/cloud/pubsublite/kafka/PubsubLiteConsumer.java | 7 +++++++
.../google/cloud/pubsublite/kafka/PubsubLiteProducer.java | 7 +++++++
3 files changed, 15 insertions(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index d9322399..ae247573 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,7 +36,7 @@
org.apache.kafka
kafka-clients
- 3.4.0
+ 3.8.0
diff --git a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java
index 57aacb10..4200b017 100644
--- a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java
+++ b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java
@@ -64,6 +64,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
/**
* A class that uses a SingleSubscriptionConsumer to remove the duplicate methods from the kafka
@@ -121,6 +122,12 @@ private SingleSubscriptionConsumer requireValidConsumer() {
return consumer.get();
}
+ @Override
+ public Uuid clientInstanceId(Duration timeout) {
+ // https://javadoc.io/static/org.apache.kafka/kafka-clients/3.8.0/org/apache/kafka/clients/consumer/KafkaConsumer.html#clientInstanceId-java.time.Duration-
+ throw new IllegalStateException("Pub/Sub Lite Kafka Connector does not support telemetry");
+ }
+
@Override
public Set assignment() {
return requireValidConsumer().assignment().stream()
diff --git a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java
index 360aff61..d061ede2 100644
--- a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java
+++ b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java
@@ -48,6 +48,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
class PubsubLiteProducer implements Producer {
@@ -77,6 +78,12 @@ public void failed(State from, Throwable failure) {
this.publisher.startAsync().awaitRunning();
}
+ @Override
+ public Uuid clientInstanceId(Duration timeout) {
+ // https://javadoc.io/static/org.apache.kafka/kafka-clients/3.8.0/org/apache/kafka/clients/consumer/KafkaConsumer.html#clientInstanceId-java.time.Duration-
+ throw new IllegalStateException("Pub/Sub Lite Kafka Connector does not support telemetry");
+ }
+
@Override
public void initTransactions() {
throw NO_TRANSACTIONS_EXCEPTION;