Skip to content

Commit

Permalink
Added ability to build consumers in functions and sources (apache#6954)
Browse files Browse the repository at this point in the history
Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>
  • Loading branch information
2 people authored and merlimat committed May 19, 2020
1 parent 0f0c5ed commit 59d1805
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.nio.ByteBuffer;

import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
Expand Down Expand Up @@ -274,4 +275,14 @@ public interface Context {
* @throws PulsarClientException
*/
<O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException;

/**
* Create a ConsumerBuilder with the schema.
*
* @param schema provide a way to convert between serialized data and domain objects
* @param <O>
* @return the consumer builder instance
* @throws PulsarClientException
*/
<O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class ContextImpl implements Context, SinkContext, SourceContext {
// Per Message related
private Record<?> record;

private PulsarClient client;
private Map<String, Producer<?>> publishProducers;
private ProducerBuilderImpl<?> producerBuilder;

Expand Down Expand Up @@ -100,6 +101,7 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
Table<ByteBuf, ByteBuf> stateTable) {
this.config = config;
this.logger = logger;
this.client = client;
this.publishProducers = new HashMap<>();
this.topicSchema = new TopicSchema(client);
this.statsManager = statsManager;
Expand Down Expand Up @@ -361,6 +363,11 @@ public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> s
return messageBuilder;
}

@Override
public <O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException {
return this.client.newConsumer(schema);
}

public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema) {
try {
return newOutputMessage(topicName, schema).value(object).sendAsync().thenApply(msgId -> null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
Expand Down Expand Up @@ -187,6 +188,11 @@ public CompletableFuture<ByteBuffer> getStateAsync(String key) {
public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException {
return null;
}

@Override
public <O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException {
return null;
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.core;

import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
Expand Down Expand Up @@ -174,4 +175,14 @@ public interface SourceContext {
* @throws PulsarClientException
*/
<O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException;

/**
* Create a ConsumerBuilder with the schema.
*
* @param schema provide a way to convert between serialized data and domain objects
* @param <O>
* @return the consumer builder instance
* @throws PulsarClientException
*/
<O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
Expand Down Expand Up @@ -166,6 +167,11 @@ public CompletableFuture<ByteBuffer> getStateAsync(String key) {
public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException {
return null;
}

@Override
public <O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException {
return null;
}
};
Map<String, Object> config = new HashMap<>();
ThrowingRunnable openAndClose = ()->{
Expand Down

0 comments on commit 59d1805

Please sign in to comment.