Skip to content

Commit

Permalink
SinkContext: ability to seek/pause/resume consumer for a topic (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 authored and ciaocloud committed Oct 16, 2021
1 parent a156977 commit b5ca8c9
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Summary;
import java.nio.ByteBuffer;
import java.util.Arrays;
Expand All @@ -33,13 +32,16 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageId;
Expand All @@ -50,8 +52,11 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.functions.ExternalPulsarConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;
Expand Down Expand Up @@ -113,6 +118,9 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable

private boolean exposePulsarAdminClientEnabled;

private List<Consumer<?>> inputConsumers;
private final Map<TopicName, Consumer> topicConsumers = new ConcurrentHashMap<>();

static {
// add label to indicate user metric
userMetricsLabelNames = Arrays.copyOf(ComponentStatsManager.metricsLabelNames, ComponentStatsManager.metricsLabelNames.length + 1);
Expand Down Expand Up @@ -706,4 +714,80 @@ public void close() {
logger.warn("Failed to close producers", e);
}
}

@Override
public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
Consumer<?> consumer = getConsumer(topic, partition);
consumer.seek(messageId);
}

@Override
public void pause(String topic, int partition) throws PulsarClientException {
getConsumer(topic, partition).pause();
}

@Override
public void resume(String topic, int partition) throws PulsarClientException {
getConsumer(topic, partition).resume();
}

public void setInputConsumers(List<Consumer<?>> inputConsumers) {
this.inputConsumers = inputConsumers;
inputConsumers.stream()
.flatMap(consumer ->
consumer instanceof MultiTopicsConsumerImpl
? ((MultiTopicsConsumerImpl<?>) consumer).getConsumers().stream()
: Stream.of(consumer))
.forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer));
}

private void reloadConsumersFromMultiTopicsConsumers() {
// MultiTopicsConsumer in the list of inputConsumers could change its nested consumers
// if ne partition was created or a new topic added that matches subscription pattern.
// Let's update topicConsumers map to match.
inputConsumers
.stream()
.flatMap(c ->
c instanceof MultiTopicsConsumerImpl
? ((MultiTopicsConsumerImpl<?>) c).getConsumers().stream()
: Stream.empty() // no changes expected in regular consumers
).forEach(c -> topicConsumers.putIfAbsent(TopicName.get(c.getTopic()), c));
}

// returns null if consumer not found
private Consumer<?> tryGetConsumer(String topic, int partition) {
if (partition == 0) {
// maybe a non-partitioned topic
Consumer<?> consumer = topicConsumers.get(TopicName.get(topic));

if (consumer != null) {
return consumer;
}
}
// maybe partitioned topic
return topicConsumers.get(TopicName.get(topic).getPartition(partition));
}

@VisibleForTesting
Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
if (inputConsumers == null) {
throw new PulsarClientException("Getting consumer is not supported");
}

Consumer<?> consumer = tryGetConsumer(topic, partition);
if (consumer == null) {
// MultiTopicsConsumer's list of consumers could change
// if partitions changed or pattern(s) used to subscribe.
// Reload and try one more time.
reloadConsumersFromMultiTopicsConsumers();
consumer = tryGetConsumer(topic, partition);
}

if (consumer != null) {
return consumer;
}
throw new PulsarClientException("Consumer for topic " + topic
+ " partition " + partition + " is not found");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.MultiConsumerPulsarSourceConfig;
import org.apache.pulsar.functions.source.MultiConsumerPulsarSource;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.functions.source.SingleConsumerPulsarSource;
import org.apache.pulsar.functions.source.SingleConsumerPulsarSourceConfig;
Expand Down Expand Up @@ -729,6 +730,9 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
this.source.open(ObjectMapperFactory.getThreadLocal().readValue(sourceSpec.getConfigs(),
new TypeReference<Map<String, Object>>() {}), contextImpl);
}
if (this.source instanceof PulsarSource) {
contextImpl.setInputConsumers(((PulsarSource) this.source).getInputConsumers());
}
} catch (Exception e) {
log.error("Source open produced uncaught exception: ", e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.functions.source;

import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
Expand All @@ -41,7 +40,7 @@ public class MultiConsumerPulsarSource<T> extends PushPulsarSource<T> implements

private final MultiConsumerPulsarSourceConfig pulsarSourceConfig;
private final ClassLoader functionClassLoader;
private List<Consumer<T>> inputConsumers = new LinkedList<>();
private final List<Consumer<T>> inputConsumers = new LinkedList<>();

public MultiConsumerPulsarSource(PulsarClient pulsarClient,
MultiConsumerPulsarSourceConfig pulsarSourceConfig,
Expand Down Expand Up @@ -107,8 +106,8 @@ private Map<String, PulsarSourceConsumerConfig<T>> setupConsumerConfigs() throws
return configs;
}

@VisibleForTesting
List<Consumer<T>> getInputConsumers() {
@Override
public List<Consumer<T>> getInputConsumers() {
return inputConsumers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.io.core.Source;
Expand All @@ -36,6 +37,7 @@
import java.security.Security;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand All @@ -57,6 +59,8 @@ protected PulsarSource(PulsarClient pulsarClient,
this.functionClassLoader = functionClassLoader;
}

public abstract List<Consumer<T>> getInputConsumers();

protected ConsumerBuilder<T> createConsumeBuilder(String topic, PulsarSourceConsumerConfig conf) {

ConsumerBuilder<T> cb = pulsarClient.newConsumer(conf.getSchema())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SourceContext;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -41,6 +43,7 @@ public class SingleConsumerPulsarSource<T> extends PulsarSource<T> {
private final ClassLoader functionClassLoader;
private final TopicSchema topicSchema;
private Consumer<T> consumer;
private final List<Consumer<T>> inputConsumers = new LinkedList<>();

public SingleConsumerPulsarSource(PulsarClient pulsarClient,
SingleConsumerPulsarSourceConfig pulsarSourceConfig,
Expand Down Expand Up @@ -71,6 +74,7 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws

ConsumerBuilder<T> cb = createConsumeBuilder(topic, pulsarSourceConsumerConfig);
consumer = cb.subscribeAsync().join();
inputConsumers.add(consumer);
}

@Override
Expand All @@ -84,6 +88,11 @@ Consumer<T> getInputConsumer() {
return consumer;
}

@Override
public List<Consumer<T>> getInputConsumers() {
return inputConsumers;
}

@Override
public void close() throws Exception {
if (consumer != null ) {
Expand Down
Loading

0 comments on commit b5ca8c9

Please sign in to comment.