Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MockConsumer does not work #176

Closed
Tracked by #172
jagrutmehta opened this issue Nov 9, 2021 · 6 comments · Fixed by #470
Closed
Tracked by #172

MockConsumer does not work #176

jagrutmehta opened this issue Nov 9, 2021 · 6 comments · Fixed by #470
Assignees

Comments

@jagrutmehta
Copy link

jagrutmehta commented Nov 9, 2021

Parallel Consumer wrapper does not work with MockConsumer. It keeps on skipping messages with error message "Record in buffer for a partition no longer assigned. Dropping."

		
            mockconsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
	    HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
	    TopicPartition tp = new TopicPartition(topic, 0);
	    startOffsets.put(tp, 0L);
	    mockconsumer.updateBeginningOffsets(startOffsets);
             mockconsumer.schedulePollTask(() -> {
	          mockconsumer.subscribe(Arrays.asList(new String[] {topic}));
                  /* Tried this one as well
                  mockconsumer.subscribe(Arrays.asList(new String[] {topic}), (ParallelEoSStreamProcessor)eosStreamProcessor);
                  */
	    	mockconsumer.rebalance(Collections.singletonList(new TopicPartition(topic, 0)) );
	    });

... doesn't work without scheduled task also

            mockconsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
	    HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
	    TopicPartition tp = new TopicPartition(topic, 0);
	    startOffsets.put(tp, 0L);
	    mockconsumer.updateBeginningOffsets(startOffsets);
 	    mockconsumer.subscribe(Arrays.asList(new String[] {topic}));
            /* Tried this one as well
            mockconsumer.subscribe(Arrays.asList(new String[] {topic}), (ParallelEoSStreamProcessor)eosStreamProcessor);
            */
           mockconsumer.rebalance(Collections.singletonList(new TopicPartition(topic, 0)) );


Above code works fine without ParallelStreamProcessor and done polling directly on consumer.

@astubbs
Copy link
Contributor

astubbs commented Nov 15, 2021

Oh very interesting. Should be a straight forward fix. Can you sub a PR for a failing test?

@jagrutmehta
Copy link
Author

@astubbs You mean PR with new test using mockconsumer which is failing?

@astubbs
Copy link
Contributor

astubbs commented Dec 27, 2021

Yes, that shows the sort of use you're looking for.

@astubbs astubbs self-assigned this Jan 11, 2022
@astubbs astubbs added the enhancement New feature or request label Jan 11, 2022
@astubbs astubbs mentioned this issue Feb 1, 2022
64 tasks
@astubbs
Copy link
Contributor

astubbs commented Jul 15, 2022

Question - do you need a MockConsumer specifically, or would a MockParallelConsumer also work?

@jagrutmehta
Copy link
Author

Ideally MockConsumer. In my case actual consumer implementation is controlled by flag (ramp etc). But if it is becoming complicated then separate MockParallelConsumer can also work.

@astubbs
Copy link
Contributor

astubbs commented Nov 3, 2022

FYI just pushed #470 - this works fine as far as I can see, as expected. But using MockConsumer is difficult (because it's not a complete implementation). Use LongPollingMockConsumer instead.

Let me know if you still have questions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants