@@ -499,7 +499,7 @@ public void testRecordAck() throws Exception {
499499 public void testRecordAckMock () throws Exception {
500500 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
501501 Consumer <Integer , String > consumer = mock (Consumer .class );
502- given (cf .createConsumer (isNull ( ), eq ("clientId" ), isNull ())).willReturn (consumer );
502+ given (cf .createConsumer (eq ( "grp" ), eq ("clientId" ), isNull ())).willReturn (consumer );
503503 final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
504504 records .put (new TopicPartition ("foo" , 0 ), Arrays .asList (
505505 new ConsumerRecord <>("foo" , 0 , 0L , 1 , "foo" ),
@@ -512,6 +512,7 @@ public void testRecordAckMock() throws Exception {
512512 TopicPartitionInitialOffset [] topicPartition = new TopicPartitionInitialOffset [] {
513513 new TopicPartitionInitialOffset ("foo" , 0 ) };
514514 ContainerProperties containerProps = new ContainerProperties (topicPartition );
515+ containerProps .setGroupId ("grp" );
515516 containerProps .setAckMode (AckMode .RECORD );
516517 final CountDownLatch latch = new CountDownLatch (2 );
517518 MessageListener <Integer , String > messageListener = spy (
@@ -565,7 +566,7 @@ public void testRecordAckMockForeignThreadImmediate() throws Exception {
565566 private void testRecordAckMockForeignThreadGuts (AckMode ackMode ) throws Exception {
566567 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
567568 Consumer <Integer , String > consumer = mock (Consumer .class );
568- given (cf .createConsumer (isNull ( ), eq ("clientId" ), isNull ())).willReturn (consumer );
569+ given (cf .createConsumer (eq ( "grp" ), eq ("clientId" ), isNull ())).willReturn (consumer );
569570 final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
570571 records .put (new TopicPartition ("foo" , 0 ), Arrays .asList (
571572 new ConsumerRecord <>("foo" , 0 , 0L , 1 , "foo" ),
@@ -578,6 +579,7 @@ private void testRecordAckMockForeignThreadGuts(AckMode ackMode) throws Exceptio
578579 TopicPartitionInitialOffset [] topicPartition = new TopicPartitionInitialOffset [] {
579580 new TopicPartitionInitialOffset ("foo" , 0 ) };
580581 ContainerProperties containerProps = new ContainerProperties (topicPartition );
582+ containerProps .setGroupId ("grp" );
581583 containerProps .setAckMode (ackMode );
582584 final CountDownLatch latch = new CountDownLatch (2 );
583585 final List <Acknowledgment > acks = new ArrayList <>();
@@ -627,7 +629,7 @@ public void onMessage(ConsumerRecord<Integer, String> data, Acknowledgment ackno
627629 public void testNonResponsiveConsumerEvent () throws Exception {
628630 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
629631 Consumer <Integer , String > consumer = mock (Consumer .class );
630- given (cf .createConsumer (isNull ( ), eq ("" ), isNull ())).willReturn (consumer );
632+ given (cf .createConsumer (eq ( "grp" ), eq ("" ), isNull ())).willReturn (consumer );
631633 final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
632634 records .put (new TopicPartition ("foo" , 0 ), Arrays .asList (
633635 new ConsumerRecord <>("foo" , 0 , 0L , 1 , "foo" ),
@@ -644,6 +646,7 @@ public void testNonResponsiveConsumerEvent() throws Exception {
644646 TopicPartitionInitialOffset [] topicPartition = new TopicPartitionInitialOffset [] {
645647 new TopicPartitionInitialOffset ("foo" , 0 ) };
646648 ContainerProperties containerProps = new ContainerProperties (topicPartition );
649+ containerProps .setGroupId ("grp" );
647650 containerProps .setNoPollThreshold (2.0f );
648651 containerProps .setPollTimeout (10 );
649652 containerProps .setMonitorInterval (1 );
@@ -1703,7 +1706,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
17031706 public void testPauseResume () throws Exception {
17041707 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
17051708 Consumer <Integer , String > consumer = mock (Consumer .class );
1706- given (cf .createConsumer (isNull ( ), eq ("clientId" ), isNull ())).willReturn (consumer );
1709+ given (cf .createConsumer (eq ( "grp" ), eq ("clientId" ), isNull ())).willReturn (consumer );
17071710 final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
17081711 records .put (new TopicPartition ("foo" , 0 ), Arrays .asList (
17091712 new ConsumerRecord <>("foo" , 0 , 0L , 1 , "foo" ),
@@ -1735,6 +1738,7 @@ public void testPauseResume() throws Exception {
17351738 TopicPartitionInitialOffset [] topicPartition = new TopicPartitionInitialOffset [] {
17361739 new TopicPartitionInitialOffset ("foo" , 0 ) };
17371740 ContainerProperties containerProps = new ContainerProperties (topicPartition );
1741+ containerProps .setGroupId ("grp" );
17381742 containerProps .setAckMode (AckMode .RECORD );
17391743 containerProps .setClientId ("clientId" );
17401744 containerProps .setIdleEventInterval (100L );
@@ -1764,7 +1768,7 @@ else if (e instanceof ConsumerResumedEvent) {
17641768 public void testInitialSeek () throws Exception {
17651769 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
17661770 Consumer <Integer , String > consumer = mock (Consumer .class );
1767- given (cf .createConsumer (isNull ( ), eq ("clientId" ), isNull ())).willReturn (consumer );
1771+ given (cf .createConsumer (eq ( "grp" ), eq ("clientId" ), isNull ())).willReturn (consumer );
17681772 ConsumerRecords <Integer , String > emptyRecords = new ConsumerRecords <>(Collections .emptyMap ());
17691773 final CountDownLatch latch = new CountDownLatch (1 );
17701774 given (consumer .poll (anyLong ())).willAnswer (i -> {
@@ -1781,6 +1785,7 @@ public void testInitialSeek() throws Exception {
17811785 new TopicPartitionInitialOffset ("foo" , 5 , SeekPosition .END ),
17821786 };
17831787 ContainerProperties containerProps = new ContainerProperties (topicPartition );
1788+ containerProps .setGroupId ("grp" );
17841789 containerProps .setAckMode (AckMode .RECORD );
17851790 containerProps .setClientId ("clientId" );
17861791 containerProps .setMessageListener ((MessageListener ) r -> { });
@@ -1868,7 +1873,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
18681873 public void testAckModeCount () throws Exception {
18691874 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
18701875 Consumer <Integer , String > consumer = mock (Consumer .class );
1871- given (cf .createConsumer (isNull ( ), eq ("clientId" ), isNull ())).willReturn (consumer );
1876+ given (cf .createConsumer (eq ( "grp" ), eq ("clientId" ), isNull ())).willReturn (consumer );
18721877 TopicPartition topicPartition = new TopicPartition ("foo" , 0 );
18731878 final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records1 = new HashMap <>();
18741879 records1 .put (topicPartition , Arrays .asList (
@@ -1911,6 +1916,7 @@ public void testAckModeCount() throws Exception {
19111916 TopicPartitionInitialOffset [] topicPartitionOffset = new TopicPartitionInitialOffset [] {
19121917 new TopicPartitionInitialOffset ("foo" , 0 ) };
19131918 ContainerProperties containerProps = new ContainerProperties (topicPartitionOffset );
1919+ containerProps .setGroupId ("grp" );
19141920 containerProps .setAckMode (AckMode .COUNT );
19151921 containerProps .setAckCount (3 );
19161922 containerProps .setClientId ("clientId" );
0 commit comments