|
33 | 33 | import com.rabbitmq.stream.OffsetSpecification; |
34 | 34 | import com.rabbitmq.stream.impl.Client.ClientParameters; |
35 | 35 | import com.rabbitmq.stream.impl.Client.ConsumerUpdateListener; |
36 | | -import com.rabbitmq.stream.impl.Client.CreditNotification; |
37 | 36 | import com.rabbitmq.stream.impl.Client.MessageListener; |
38 | 37 | import com.rabbitmq.stream.impl.Client.Response; |
39 | 38 | import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; |
@@ -534,20 +533,10 @@ void superStreamRebalancingShouldWorkWhilePublishing(TestInfo info) throws Excep |
534 | 533 | lastDispatchedOffset.set(offset); |
535 | 534 | receivedMessagesLatch.countDown(); |
536 | 535 | }; |
537 | | - AtomicInteger creditNotificationResponseCode = new AtomicInteger(); |
538 | | - // we keep track of credit errors |
539 | | - // with the amount of initial credit and the rebalancing, |
540 | | - // the first subscriber is likely to have in-flight credit commands |
541 | | - // when it becomes inactive. The server should then send some credit |
542 | | - // notifications to tell the client it's not supposed to ask for credits |
543 | | - // for this subscription. |
544 | | - CreditNotification creditNotification = |
545 | | - (subscriptionId, responseCode) -> creditNotificationResponseCode.set(responseCode); |
546 | 536 | ClientParameters clientParameters = |
547 | 537 | new ClientParameters() |
548 | 538 | .chunkListener(TestUtils.credit()) |
549 | 539 | .messageListener(messageListener) |
550 | | - .creditNotification(creditNotification) |
551 | 540 | .consumerUpdateListener(consumerUpdateListener); |
552 | 541 | Client client1 = cf.get(clientParameters); |
553 | 542 | Map<String, String> subscriptionProperties = new HashMap<>(); |
@@ -584,10 +573,6 @@ void superStreamRebalancingShouldWorkWhilePublishing(TestInfo info) throws Excep |
584 | 573 |
|
585 | 574 | waitAtMost(() -> consumerStates.get(b(1))); |
586 | 575 |
|
587 | | - waitAtMost( |
588 | | - () -> |
589 | | - creditNotificationResponseCode.get() == Constants.RESPONSE_CODE_PRECONDITION_FAILED); |
590 | | - |
591 | 576 | Response response = client1.unsubscribe(b(0)); |
592 | 577 | assertThat(response).is(ok()); |
593 | 578 | response = client2.unsubscribe(b(1)); |
|
0 commit comments