Skip to content

Commit

Permalink
handle null offsets from kafka connector (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 authored Mar 31, 2022
1 parent 70f0fdd commit d40f0b5
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.lang.StringUtils.isBlank;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
Expand All @@ -35,8 +36,6 @@
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -54,7 +53,6 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
private Map<ByteBuffer, ByteBuffer> data;
private PulsarClient client;
private String topic;
private String token;
private Producer<byte[]> producer;
private Reader<byte[]> reader;
private volatile CompletableFuture<Void> outstandingReadToEnd = null;
Expand Down Expand Up @@ -211,8 +209,17 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb
values.forEach((key, value) -> {
ByteBuf bb = Unpooled.wrappedBuffer(key);
byte[] keyBytes = ByteBufUtil.getBytes(bb);
bb = Unpooled.wrappedBuffer(value);
byte[] valBytes = ByteBufUtil.getBytes(bb);
byte[] valBytes = null;
if (value != null) {
bb = Unpooled.wrappedBuffer(value);
valBytes = ByteBufUtil.getBytes(bb);
} else {
// It does not actually matter if it is earliest or latest.
// The connector that provides null offsets works with the
// system that cannot seek to the offset anyway.
// Just need to store something to keep the offset store happy.
valBytes = MessageId.earliest.toByteArray();
}
producer.newMessage()
.key(new String(keyBytes, UTF_8))
.value(valBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,16 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.util.Callback;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.Mockito.mock;

/**
* Test the implementation of {@link PulsarOffsetBackingStore}.
*/
Expand Down Expand Up @@ -91,6 +89,28 @@ public void testGetFromEmpty() throws Exception {
).get().isEmpty());
}

@Test(timeOut = 60000)
public void testGetSetNullValue() throws Exception {
Map<ByteBuffer, ByteBuffer> kvs = new HashMap<>();
ByteBuffer keyToSet = ByteBuffer.wrap(("test-key").getBytes(UTF_8));
kvs.put(keyToSet, null);
offsetBackingStore.set(kvs, null).get();

final List<ByteBuffer> keys = new ArrayList<>();
keys.add(keyToSet);

Map<ByteBuffer, ByteBuffer> result =
offsetBackingStore.get(keys).get();
assertEquals(1, result.size());

result.forEach((key, value) -> {
byte[] keyData = ByteBufUtil.getBytes(Unpooled.wrappedBuffer(key));
assertEquals(new String(keyData, UTF_8), "test-key");
byte[] valData = ByteBufUtil.getBytes(Unpooled.wrappedBuffer(value));
assertEquals(valData, MessageId.earliest.toByteArray());
});
}

@Test
public void testGetSet() throws Exception {
testGetSet(false);
Expand Down

0 comments on commit d40f0b5

Please sign in to comment.