Skip to content

Commit

Permalink
Added ability for sources to publish messages on their own just like …
Browse files Browse the repository at this point in the history
…their function counterparts (apache#6941)

Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>
  • Loading branch information
2 people authored and huangdx0726 committed Aug 24, 2020
1 parent 8e69beb commit 94eb982
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
Expand Down Expand Up @@ -179,6 +182,11 @@ public ByteBuffer getState(String key) {
public CompletableFuture<ByteBuffer> getStateAsync(String key) {
return null;
}

@Override
public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException {
return null;
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.io.core;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.slf4j.Logger;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -160,4 +163,15 @@ public interface SourceContext {
* @return the state value for the key.
*/
CompletableFuture<ByteBuffer> getStateAsync(String key);

/**
* New output message using schema for serializing to the topic
*
* @param topicName The name of the topic for output message
* @param schema provide a way to convert between serialized data and domain objects
* @param <O>
* @return the message builder instance
* @throws PulsarClientException
*/
<O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.KafkaAbstractSource;
import org.apache.pulsar.io.kafka.KafkaSourceConfig;
Expand Down Expand Up @@ -158,6 +161,11 @@ public ByteBuffer getState(String key) {
public CompletableFuture<ByteBuffer> getStateAsync(String key) {
return null;
}

@Override
public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException {
return null;
}
};
Map<String, Object> config = new HashMap<>();
ThrowingRunnable openAndClose = ()->{
Expand Down

0 comments on commit 94eb982

Please sign in to comment.