Skip to content

Alternative API with KStream on top of Processor#5

Merged
Guozhang Wang (guozhangwang) merged 18 commits intoconfluentinc:streamingfrom
guozhangwang:S-Processor
Aug 11, 2015
Merged

Alternative API with KStream on top of Processor#5
Guozhang Wang (guozhangwang) merged 18 commits intoconfluentinc:streamingfrom
guozhangwang:S-Processor

Conversation

@guozhangwang

Jay Kreps (@jkreps) Neha Narkhede (@nehanarkhede) ymatsuda

This is a second attempt for KStream-on-top-of-Processor: I maintained generics on KafkaProcessor and make addProcessor accepting the parent object directly.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you allow processors to be added after the topology has been built?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we should need a default Processor implementation.

@jkreps
Copy link

I like the separation of layers a lot. I left some minor comments. At a high-level I think I am still stuck by the end user interface being a little awkward. Currently I think the interface is to define a class that builds a topology and then pass it in as a class. I think this is a bit weird.

I expect to do something like this:

    Topology myTopology = new TopologyBuilder(defaultDeser)
                                                 .addProcessor("my-processor", MyProcessor.class, new Source("my-source"))
                                                .addProcessor("my-other-processor", MyOtherProcessor.class, new Source("other-source", customDeser, customDeser), "my-processor");
    KafkaStreaming streaming = new KafkaStreaming(config, myTopology);
   streaming.run();

I think this is tied back to the question of whether I pass processor instances or classes. I think I may understand part of the confusion on this issue. One thing I am assuming is that the user will pass fully instantiated Objects to the processors that would be available in the init method.

So the implementation of KStream.filter currently looks like this:

    public KStream<K, V> filter(Predicate<K, V> predicate) {
        KStreamFilter<K, V> filter = new KStreamFilter<>(predicate);
        topology.addProcessor(filter, processor);
        return new KStreamImpl<>(topology, filter);
    }

What I am proposing is that it look instead like this:

    public KStream<K, V> filter(Predicate<K, V> predicate) {
        KStreamFilter<K, V> filter = new KStreamFilter<>(predicate);
        topology.addProcessor(KStreamFilter.class, new Configs("predicate", predicate));
        return this;
    }

The advantage is that the user code can now get rid of the whole Topology class with the builder. I think the order of execution for that API is quite unintuitive.

Guozhang Wang (guozhangwang) added a commit that referenced this pull request Aug 11, 2015
Alternative API with KStream on top of Processor
@guozhangwang Guozhang Wang (guozhangwang) merged commit 68d055c into confluentinc:streaming Aug 11, 2015
@ymatsuda
Copy link

I am not convinced that we need this complexity.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove RoundRobinChooser.

@guozhangwang Guozhang Wang (guozhangwang) deleted the S-Processor branch October 7, 2016 21:45
Andrew Egelhofer (andrewegel) pushed a commit that referenced this pull request Nov 16, 2021
…projects (#5)

ce-trunk had several failing tests due to bringing ce-broker-plugins and ce-security into the
main ce-kafka project.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants