From b5ca8c9f31cc00cca498e57525d5a16aefb8781b Mon Sep 17 00:00:00 2001 From: Andrey Yegorov <8622884+dlg99@users.noreply.github.com> Date: Mon, 17 May 2021 23:19:03 -0700 Subject: [PATCH] SinkContext: ability to seek/pause/resume consumer for a topic (#10498) --- .../functions/instance/ContextImpl.java | 86 ++++++++- .../instance/JavaInstanceRunnable.java | 4 + .../source/MultiConsumerPulsarSource.java | 7 +- .../pulsar/functions/source/PulsarSource.java | 4 + .../source/SingleConsumerPulsarSource.java | 9 + .../functions/instance/ContextImplTest.java | 180 ++++++++++++++++-- .../functions/source/PulsarSourceTest.java | 25 +++ .../apache/pulsar/io/core/SinkContext.java | 31 +++ 8 files changed, 321 insertions(+), 25 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index ad3687762ba059..97aa538fe86792 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Summary; import java.nio.ByteBuffer; import java.util.Arrays; @@ -33,13 +32,16 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import lombok.ToString; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.HashingScheme; import org.apache.pulsar.client.api.MessageId; @@ -50,8 +52,11 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.ProducerBuilderImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.functions.ExternalPulsarConfig; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Record; @@ -113,6 +118,9 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable private boolean exposePulsarAdminClientEnabled; + private List> inputConsumers; + private final Map topicConsumers = new ConcurrentHashMap<>(); + static { // add label to indicate user metric userMetricsLabelNames = Arrays.copyOf(ComponentStatsManager.metricsLabelNames, ComponentStatsManager.metricsLabelNames.length + 1); @@ -706,4 +714,80 @@ public void close() { logger.warn("Failed to close producers", e); } } + + @Override + public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException { + Consumer consumer = getConsumer(topic, partition); + consumer.seek(messageId); + } + + @Override + public void pause(String topic, int partition) throws PulsarClientException { + getConsumer(topic, partition).pause(); + } + + @Override + public void resume(String topic, int partition) throws PulsarClientException { + getConsumer(topic, partition).resume(); + } + + public void setInputConsumers(List> inputConsumers) { + this.inputConsumers = inputConsumers; + inputConsumers.stream() + .flatMap(consumer -> + consumer instanceof MultiTopicsConsumerImpl + ? ((MultiTopicsConsumerImpl) consumer).getConsumers().stream() + : Stream.of(consumer)) + .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer)); + } + + private void reloadConsumersFromMultiTopicsConsumers() { + // MultiTopicsConsumer in the list of inputConsumers could change its nested consumers + // if ne partition was created or a new topic added that matches subscription pattern. + // Let's update topicConsumers map to match. + inputConsumers + .stream() + .flatMap(c -> + c instanceof MultiTopicsConsumerImpl + ? ((MultiTopicsConsumerImpl) c).getConsumers().stream() + : Stream.empty() // no changes expected in regular consumers + ).forEach(c -> topicConsumers.putIfAbsent(TopicName.get(c.getTopic()), c)); + } + + // returns null if consumer not found + private Consumer tryGetConsumer(String topic, int partition) { + if (partition == 0) { + // maybe a non-partitioned topic + Consumer consumer = topicConsumers.get(TopicName.get(topic)); + + if (consumer != null) { + return consumer; + } + } + // maybe partitioned topic + return topicConsumers.get(TopicName.get(topic).getPartition(partition)); + } + + @VisibleForTesting + Consumer getConsumer(String topic, int partition) throws PulsarClientException { + if (inputConsumers == null) { + throw new PulsarClientException("Getting consumer is not supported"); + } + + Consumer consumer = tryGetConsumer(topic, partition); + if (consumer == null) { + // MultiTopicsConsumer's list of consumers could change + // if partitions changed or pattern(s) used to subscribe. + // Reload and try one more time. + reloadConsumersFromMultiTopicsConsumers(); + consumer = tryGetConsumer(topic, partition); + } + + if (consumer != null) { + return consumer; + } + throw new PulsarClientException("Consumer for topic " + topic + + " partition " + partition + " is not found"); + } + } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 3b9285cfea445a..68b396ea753587 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -68,6 +68,7 @@ import org.apache.pulsar.functions.sink.PulsarSinkDisable; import org.apache.pulsar.functions.source.MultiConsumerPulsarSourceConfig; import org.apache.pulsar.functions.source.MultiConsumerPulsarSource; +import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.source.PulsarSourceConfig; import org.apache.pulsar.functions.source.SingleConsumerPulsarSource; import org.apache.pulsar.functions.source.SingleConsumerPulsarSourceConfig; @@ -729,6 +730,9 @@ private void setupInput(ContextImpl contextImpl) throws Exception { this.source.open(ObjectMapperFactory.getThreadLocal().readValue(sourceSpec.getConfigs(), new TypeReference>() {}), contextImpl); } + if (this.source instanceof PulsarSource) { + contextImpl.setInputConsumers(((PulsarSource) this.source).getInputConsumers()); + } } catch (Exception e) { log.error("Source open produced uncaught exception: ", e); throw e; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java index c3eee48262a682..a60b21d2df3f12 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.functions.source; -import com.google.common.annotations.VisibleForTesting; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -41,7 +40,7 @@ public class MultiConsumerPulsarSource extends PushPulsarSource implements private final MultiConsumerPulsarSourceConfig pulsarSourceConfig; private final ClassLoader functionClassLoader; - private List> inputConsumers = new LinkedList<>(); + private final List> inputConsumers = new LinkedList<>(); public MultiConsumerPulsarSource(PulsarClient pulsarClient, MultiConsumerPulsarSourceConfig pulsarSourceConfig, @@ -107,8 +106,8 @@ private Map> setupConsumerConfigs() throws return configs; } - @VisibleForTesting - List> getInputConsumers() { + @Override + public List> getInputConsumers() { return inputConsumers; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index a6a944666adc2e..560e99afde7a9e 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -28,6 +28,7 @@ import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.utils.CryptoUtils; import org.apache.pulsar.io.core.Source; @@ -36,6 +37,7 @@ import java.security.Security; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -57,6 +59,8 @@ protected PulsarSource(PulsarClient pulsarClient, this.functionClassLoader = functionClassLoader; } + public abstract List> getInputConsumers(); + protected ConsumerBuilder createConsumeBuilder(String topic, PulsarSourceConsumerConfig conf) { ConsumerBuilder cb = pulsarClient.newConsumer(conf.getSchema()) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java index 8320f0d2b52f8d..f39577e7c35d56 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java @@ -28,6 +28,8 @@ import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.SourceContext; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import static com.google.common.base.Preconditions.checkArgument; @@ -41,6 +43,7 @@ public class SingleConsumerPulsarSource extends PulsarSource { private final ClassLoader functionClassLoader; private final TopicSchema topicSchema; private Consumer consumer; + private final List> inputConsumers = new LinkedList<>(); public SingleConsumerPulsarSource(PulsarClient pulsarClient, SingleConsumerPulsarSourceConfig pulsarSourceConfig, @@ -71,6 +74,7 @@ public void open(Map config, SourceContext sourceContext) throws ConsumerBuilder cb = createConsumeBuilder(topic, pulsarSourceConsumerConfig); consumer = cb.subscribeAsync().join(); + inputConsumers.add(consumer); } @Override @@ -84,6 +88,11 @@ Consumer getInputConsumer() { return consumer; } + @Override + public List> getInputConsumers() { + return inputConsumers; + } + @Override public void close() throws Exception { if (consumer != null ) { diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index 28d76999388de0..9a228bd996fd8e 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -18,36 +18,22 @@ */ package org.apache.pulsar.functions.instance; -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.same; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNull; - -import io.prometheus.client.CollectorRegistry; - -import java.nio.ByteBuffer; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; - import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.ProducerBase; import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.state.BKStateStoreImpl; import org.apache.pulsar.functions.instance.state.InstanceStateManager; @@ -55,11 +41,32 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider; import org.apache.pulsar.io.core.SinkContext; +import org.assertj.core.util.Lists; +import org.mockito.Mockito; import org.slf4j.Logger; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; + /** * Unit test {@link ContextImpl}. */ @@ -199,4 +206,137 @@ FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), pulsarAdmin); context.getPulsarAdmin(); } + + @Test + public void testUnsupportedExtendedSinkContext(){ + config.setExposePulsarAdminClientEnabled(false); + context = new ContextImpl( + config, + logger, + client, + new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], + FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), + pulsarAdmin); + try { + context.seek("z", 0, Mockito.mock(MessageId.class)); + Assert.fail("Expected exception"); + } catch (PulsarClientException e) { + // pass + } + try { + context.pause("z", 0); + Assert.fail("Expected exception"); + } catch (PulsarClientException e) { + // pass + } + try { + context.resume("z", 0); + Assert.fail("Expected exception"); + } catch (PulsarClientException e) { + // pass + } + } + + @Test + public void testExtendedSinkContext() throws PulsarClientException { + config.setExposePulsarAdminClientEnabled(false); + context = new ContextImpl( + config, + logger, + client, + new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], + FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), + pulsarAdmin); + Consumer mockConsumer = Mockito.mock(Consumer.class); + when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString()); + context.setInputConsumers(Lists.newArrayList(mockConsumer)); + + context.seek("z", 0, Mockito.mock(MessageId.class)); + Mockito.verify(mockConsumer, Mockito.times(1)).seek(any(MessageId.class)); + + context.pause("z", 0); + Mockito.verify(mockConsumer, Mockito.times(1)).pause(); + + context.resume("z", 0); + Mockito.verify(mockConsumer, Mockito.times(1)).resume(); + + try { + context.resume("unknown", 0); + Assert.fail("Expected exception"); + } catch (PulsarClientException e) { + // pass + } + } + + @Test + public void testGetConsumer() throws PulsarClientException { + config.setExposePulsarAdminClientEnabled(false); + context = new ContextImpl( + config, + logger, + client, + new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], + FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), + pulsarAdmin); + Consumer mockConsumer = Mockito.mock(Consumer.class); + when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString()); + context.setInputConsumers(Lists.newArrayList(mockConsumer)); + + Assert.assertNotNull(context.getConsumer("z", 0)); + try { + context.getConsumer("z", 1); + Assert.fail("Expected exception"); + } catch (PulsarClientException e) { + // pass + } + } + + @Test + public void testGetConsumerMultiTopic() throws PulsarClientException { + config.setExposePulsarAdminClientEnabled(false); + context = new ContextImpl( + config, + logger, + client, + new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], + FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), + pulsarAdmin); + ConsumerImpl consumer1 = Mockito.mock(ConsumerImpl.class); + when(consumer1.getTopic()).thenReturn(TopicName.get("first").toString()); + ConsumerImpl consumer2 = Mockito.mock(ConsumerImpl.class); + when(consumer2.getTopic()).thenReturn(TopicName.get("second").toString()); + List> consumersList = Lists.newArrayList(consumer1, consumer2); + + MultiTopicsConsumerImpl mtc = Mockito.mock(MultiTopicsConsumerImpl.class); + when(mtc.getConsumers()).thenReturn(consumersList); + + context.setInputConsumers(Lists.newArrayList(mtc)); + + Assert.assertNotNull(context.getConsumer("first", 0)); + Assert.assertNotNull(context.getConsumer("second", 0)); + + try { + // nknown topic + context.getConsumer("third", 0); + Assert.fail("Expected exception"); + } catch (PulsarClientException e) { + // pass + } + + // consumers updated + ConsumerImpl consumer3 = Mockito.mock(ConsumerImpl.class); + when(consumer3.getTopic()).thenReturn(TopicName.get("third").toString()); + consumersList.add(consumer3); + + Assert.assertNotNull(context.getConsumer("third", 0)); + + try { + // unknown partition + context.getConsumer("third", 1); + Assert.fail("Expected exception"); + } catch (PulsarClientException e) { + // pass + } + + } } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index 5dfbd9a71693ac..8fed210f916ca1 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -32,6 +32,8 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import lombok.Cleanup; import lombok.Getter; @@ -52,10 +54,13 @@ import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.io.core.SourceContext; +import org.junit.Assert; import org.mockito.ArgumentMatcher; import static org.testng.Assert.assertSame; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -330,4 +335,24 @@ public void testPreserveOriginalSchema(PulsarSourceConfig pulsarSourceConfig) th Record pushed = pulsarSource.read(); assertSame(pushed.getSchema(), schema); } + + @Test(dataProvider = "sourceImpls") + public void testInputConsumersGetter(PulsarSourceConfig pulsarSourceConfig) throws Exception { + PulsarSource pulsarSource = getPulsarSource(pulsarSourceConfig); + pulsarSource.open(new HashMap<>(), null); + + if (pulsarSourceConfig instanceof SingleConsumerPulsarSourceConfig) { + SingleConsumerPulsarSourceConfig cfg = (SingleConsumerPulsarSourceConfig) pulsarSourceConfig; + Assert.assertEquals(1, pulsarSource.getInputConsumers().size()); + return; + } + + if (pulsarSourceConfig instanceof MultiConsumerPulsarSourceConfig) { + MultiConsumerPulsarSourceConfig cfg = (MultiConsumerPulsarSourceConfig) pulsarSourceConfig; + Assert.assertEquals(cfg.getTopicSchema().size(), pulsarSource.getInputConsumers().size()); + return; + } + + fail("Unknown config type"); + } } diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java index af5499e968ae3c..cbc3fd877b8dbb 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java @@ -21,6 +21,8 @@ import java.util.Collection; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -51,4 +53,33 @@ public interface SinkContext extends ConnectorContext { default SubscriptionType getSubscriptionType() { throw new UnsupportedOperationException("Context does not provide SubscriptionType"); } + + /** + * Reset the subscription associated with this topic and partition to a specific message id. + * @param topic - topic name + * @param partition - partition id (0 for non-partitioned topics) + * @param messageId to reset to + * @throws PulsarClientException + */ + default void seek(String topic, int partition, MessageId messageId) throws PulsarClientException { + throw new UnsupportedOperationException("not implemented"); + } + + /** + * Stop requesting new messages for given topic and partition until {@link #resume(String topic)} is called. + * @param topic - topic name + * @param partition - partition id (0 for non-partitioned topics) + */ + default void pause(String topic, int partition) throws PulsarClientException { + throw new UnsupportedOperationException("not implemented"); + } + + /** + * Resume requesting messages. + * @param topic - topic name + * @param partition - partition id (0 for non-partitioned topics) + */ + default void resume(String topic, int partition) throws PulsarClientException { + throw new UnsupportedOperationException("not implemented"); + } }