From ac393f2550958c110a5d0aa3022f5dad6366811b Mon Sep 17 00:00:00 2001 From: Jorgen Ringen Date: Tue, 10 Nov 2020 23:16:30 +0100 Subject: [PATCH] feature: Choose between Consumer commit or Producer transactional commits - Choose either Consumer sync or async commits - Fixes #25 https://github.com/confluentinc/parallel-consumer/issues/25: -- Sometimes a a transaction error occurs - Cannot call send in state COMMITTING_TRANSACTION #25 - ReentrantReadWrite lock protects non-thread safe transactional producer from incorrect multithreaded use - Wider lock to prevent transaction's containing produced messages that they shouldn't - Implement non transactional synchronous commit sync properly - Select tests adapted to non transactional as well - Must start tx in MockProducer as well - Adds supervision to poller - Fixes a performance issue with the async committer not being woken up - Enhances tests to run under multiple commit modes - Fixes example app tests - incorrectly testing wrong thing and MockProducer not configured to auto complete - Make committer thread revoke partitions and commit - Have onPartitionsRevoked be responsible for committing on close, instead of an explicit call to commit by controller - Make sure Broker Poller now drains properly, committing any waiting work - Add missing revoke flow to MockConsumer wrapper - Add missing latch timeout check --- .gitignore | 5 +- .idea/runConfigurations/All_examples.xml | 2 +- README.adoc | 59 ++-- .../io/confluent/csid/utils/StringUtils.java | 8 +- .../AbstractOffsetCommitter.java | 59 ++++ .../parallelconsumer/BrokerPollSystem.java | 192 +++++++---- .../parallelconsumer/ConsumerManager.java | 114 +++++++ .../ConsumerOffsetCommitter.java | 203 ++++++++++++ .../ErrorInUserFunctionException.java | 10 + .../parallelconsumer/InternalError.java | 20 ++ .../parallelconsumer/OffsetCommitter.java | 5 + .../ParallelConsumerOptions.java | 82 ++++- .../ParallelEoSStreamProcessor.java | 313 ++++++++---------- .../parallelconsumer/ProducerManager.java | 311 +++++++++++++++++ .../parallelconsumer/UserFunctions.java | 38 +++ .../parallelconsumer/WorkManager.java | 53 +-- .../CloseAndOpenOffsetTest.java | 85 ++++- .../integrationTests/KafkaTest.java | 36 +- .../integrationTests/LoadTest.java | 6 +- .../TransactionAndCommitModeTest.java | 143 ++++++++ .../integrationTests/VolumeTests.java | 116 ++++--- .../utils/KafkaClientUtils.java | 7 +- .../confluent/csid/utils/KafkaTestUtils.java | 44 ++- .../csid/utils/LongPollingMockConsumer.java | 92 ++++- .../ParallelEoSStreamProcessorTest.java | 176 ++++++---- .../ParallelEoSStreamProcessorTestBase.java | 142 ++++++-- .../parallelconsumer/WorkManagerTest.java | 6 +- .../test/resources/junit-platform.properties | 2 +- .../src/test/resources/logback-test.xml | 23 +- .../examples/core/CoreApp.java | 50 +-- .../examples/core/CoreAppTest.java | 31 +- ...logback-test.xml => logback-temp-test.xml} | 2 +- .../examples/streams/StreamsApp.java | 11 +- .../examples/streams/StreamsAppTest.java | 2 +- .../parallel-consumer-example-vertx/pom.xml | 13 + .../examples/vertx/VertxApp.java | 12 +- .../examples/vertx/VertxAppTest.java | 29 +- ...StreamVertxParallelEoSStreamProcessor.java | 10 +- .../VertxParallelEoSStreamProcessor.java | 16 +- .../parallelconsumer/vertx/VertxTest.java | 12 +- src/docs/README.adoc | 29 +- 41 files changed, 2036 insertions(+), 533 deletions(-) create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/AbstractOffsetCommitter.java create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ConsumerManager.java create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ConsumerOffsetCommitter.java create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ErrorInUserFunctionException.java create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/InternalError.java create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetCommitter.java create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ProducerManager.java create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/UserFunctions.java create mode 100644 parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionAndCommitModeTest.java rename parallel-consumer-examples/parallel-consumer-example-core/src/test/resources/{logback-test.xml => logback-temp-test.xml} (94%) diff --git a/.gitignore b/.gitignore index f5833c742..05e3ba15b 100644 --- a/.gitignore +++ b/.gitignore @@ -29,4 +29,7 @@ hs_err_pid* *.iml target/ .DS_Store -*.versionsBackup \ No newline at end of file +*.versionsBackup + +# JENV +.java-version \ No newline at end of file diff --git a/.idea/runConfigurations/All_examples.xml b/.idea/runConfigurations/All_examples.xml index ae2384feb..8c82d61b5 100644 --- a/.idea/runConfigurations/All_examples.xml +++ b/.idea/runConfigurations/All_examples.xml @@ -7,7 +7,7 @@