Skip to content

Commit caf7648

Browse files
authoredApr 20, 2022
Kafka Connect Adaptor: handle null offsets from kafka connector (#14916)
1 parent 5604dd5 commit caf7648

File tree

2 files changed

+34
-2
lines changed

2 files changed

+34
-2
lines changed
 

‎pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,17 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb
213213
values.forEach((key, value) -> {
214214
ByteBuf bb = Unpooled.wrappedBuffer(key);
215215
byte[] keyBytes = ByteBufUtil.getBytes(bb);
216-
bb = Unpooled.wrappedBuffer(value);
217-
byte[] valBytes = ByteBufUtil.getBytes(bb);
216+
byte[] valBytes = null;
217+
if (value != null) {
218+
bb = Unpooled.wrappedBuffer(value);
219+
valBytes = ByteBufUtil.getBytes(bb);
220+
} else {
221+
// It does not actually matter if it is earliest or latest.
222+
// The connector that provides null offsets works with the
223+
// system that cannot seek to the offset anyway.
224+
// Just need to store something to keep the offset store happy.
225+
valBytes = MessageId.earliest.toByteArray();
226+
}
218227
producer.newMessage()
219228
.key(new String(keyBytes, UTF_8))
220229
.value(valBytes)

‎pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java

+23
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.atomic.AtomicInteger;
3636
import lombok.extern.slf4j.Slf4j;
3737
import org.apache.kafka.connect.util.Callback;
38+
import org.apache.pulsar.client.api.MessageId;
3839
import org.apache.pulsar.client.api.ProducerConsumerBase;
3940
import org.apache.pulsar.client.api.PulsarClient;
4041
import org.testng.annotations.AfterMethod;
@@ -88,6 +89,28 @@ public void testGetFromEmpty() throws Exception {
8889
).get().isEmpty());
8990
}
9091

92+
@Test(timeOut = 60000)
93+
public void testGetSetNullValue() throws Exception {
94+
Map<ByteBuffer, ByteBuffer> kvs = new HashMap<>();
95+
ByteBuffer keyToSet = ByteBuffer.wrap(("test-key").getBytes(UTF_8));
96+
kvs.put(keyToSet, null);
97+
offsetBackingStore.set(kvs, null).get();
98+
99+
final List<ByteBuffer> keys = new ArrayList<>();
100+
keys.add(keyToSet);
101+
102+
Map<ByteBuffer, ByteBuffer> result =
103+
offsetBackingStore.get(keys).get();
104+
assertEquals(1, result.size());
105+
106+
result.forEach((key, value) -> {
107+
byte[] keyData = ByteBufUtil.getBytes(Unpooled.wrappedBuffer(key));
108+
assertEquals(new String(keyData, UTF_8), "test-key");
109+
byte[] valData = ByteBufUtil.getBytes(Unpooled.wrappedBuffer(value));
110+
assertEquals(valData, MessageId.earliest.toByteArray());
111+
});
112+
}
113+
91114
@Test
92115
public void testGetSet() throws Exception {
93116
testGetSet(false);

0 commit comments

Comments
 (0)
Please sign in to comment.