Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
add NPE check and unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoran10 committed Jul 24, 2023
1 parent 73a9a9c commit 5183d18
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
*/
package io.streamnative.pulsar.handlers.kop.storage;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.streamnative.pulsar.handlers.kop.SystemTopicClient;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -52,7 +54,8 @@ public class PulsarTopicProducerStateManagerSnapshotBuffer implements ProducerSt

private CompletableFuture<Void> currentReadHandle;

private synchronized CompletableFuture<Reader<ByteBuffer>> ensureReaderHandle() {
@VisibleForTesting
public synchronized CompletableFuture<Reader<ByteBuffer>> ensureReaderHandle() {
if (reader == null) {
CompletableFuture<Reader<ByteBuffer>> newReader = pulsarClient.newReaderBuilder()
.topic(topic)
Expand Down Expand Up @@ -90,7 +93,8 @@ private synchronized void discardReader(Reader<ByteBuffer> oldReader) {
}
}

private synchronized CompletableFuture<Producer<ByteBuffer>> ensureProducerHandle() {
@VisibleForTesting
public synchronized CompletableFuture<Producer<ByteBuffer>> ensureProducerHandle() {
if (producer == null) {
CompletableFuture<Producer<ByteBuffer>> newProducer = pulsarClient.newProducerBuilder()
.enableBatching(false)
Expand Down Expand Up @@ -160,6 +164,10 @@ private synchronized CompletableFuture<Void> ensureLatestData(boolean beforeWrit
// please note that the read operation is async,
// and it is not execute inside this synchronized block
CompletableFuture<Reader<ByteBuffer>> readerHandle = ensureReaderHandle();
if (readerHandle == null) {
return CompletableFuture.failedFuture(
new KoPTopicException("Failed to create reader handle for " + topic));
}
final CompletableFuture<Void> newReadHandle =
readerHandle.thenCompose(this::readNextMessageIfAvailable);
currentReadHandle = newReadHandle;
Expand Down Expand Up @@ -188,7 +196,12 @@ public CompletableFuture<Void> write(ProducerStateManagerSnapshot snapshot) {
// cannot serialise, skip
return CompletableFuture.completedFuture(null);
}
return ensureProducerHandle().thenCompose(opProducer -> {
CompletableFuture<Producer<ByteBuffer>> producerFuture = ensureProducerHandle();
if (producerFuture == null) {
return CompletableFuture.failedFuture(
new KoPTopicException("Failed to create producer handle for " + topic));
}
return producerFuture.thenCompose(opProducer -> {
// nobody can write now to the topic
// wait for local cache to be up-to-date
return ensureLatestData(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,26 @@
*/
package io.streamnative.pulsar.handlers.kop.storage;

import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.streamnative.pulsar.handlers.kop.SystemTopicClient;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -69,4 +78,48 @@ public void testSerializeAndDeserialize() {
}
}

@Test(timeOut = 5_000)
public void ensureReaderHandleCaughtExceptionTest() {
SystemTopicClient sysTopicClient = spy(new SystemTopicClient(pulsar, conf));
ReaderBuilder<ByteBuffer> readerBuilder = spy(sysTopicClient.newReaderBuilder());
when(readerBuilder.createAsync()).thenReturn(CompletableFuture.failedFuture(new RuntimeException("inject")));
when(sysTopicClient.newReaderBuilder()).thenReturn(readerBuilder);

PulsarTopicProducerStateManagerSnapshotBuffer snapshotBuffer =
new PulsarTopicProducerStateManagerSnapshotBuffer("snapshot-test-topic", sysTopicClient);
CompletableFuture<Reader<ByteBuffer>> readerFuture = snapshotBuffer.ensureReaderHandle();
if (readerFuture != null) {
try {
readerFuture.get();
fail("should fail");
} catch (Exception e) {
assertEquals(e.getCause().getMessage(), "inject");
}
} else {
log.info("This is expected behavior.");
}
}

@Test(timeOut = 5_000)
public void ensureProducerCaughtExceptionTest() {
SystemTopicClient sysTopicClient = spy(new SystemTopicClient(pulsar, conf));
ProducerBuilder<ByteBuffer> producerBuilder = spy(sysTopicClient.newProducerBuilder());
when(producerBuilder.createAsync()).thenReturn(CompletableFuture.failedFuture(new RuntimeException("inject")));
when(sysTopicClient.newProducerBuilder()).thenReturn(producerBuilder);

PulsarTopicProducerStateManagerSnapshotBuffer snapshotBuffer =
new PulsarTopicProducerStateManagerSnapshotBuffer("snapshot-test-topic", sysTopicClient);
CompletableFuture<Producer<ByteBuffer>> producerFuture = snapshotBuffer.ensureProducerHandle();
if (producerFuture != null) {
try {
producerFuture.get();
fail("should fail");
} catch (Exception e) {
assertEquals(e.getCause().getMessage(), "inject");
}
} else {
log.info("This is expected behavior.");
}
}

}

0 comments on commit 5183d18

Please sign in to comment.