1515package com .rabbitmq .stream .impl ;
1616
1717import static com .rabbitmq .stream .impl .Utils .*;
18+ import static java .util .stream .Collectors .toList ;
1819import static java .util .stream .Collectors .toSet ;
1920
2021import com .rabbitmq .stream .BackOffDelayPolicy ;
4950import org .slf4j .Logger ;
5051import org .slf4j .LoggerFactory ;
5152
52- class ProducersCoordinator {
53+ final class ProducersCoordinator implements AutoCloseable {
5354
5455 static final int MAX_PRODUCERS_PER_CLIENT = 256 ;
5556 static final int MAX_TRACKING_CONSUMERS_PER_CLIENT = 50 ;
@@ -67,18 +68,21 @@ class ProducersCoordinator {
6768 new DefaultExecutorServiceFactory (
6869 Runtime .getRuntime ().availableProcessors (), 10 , "rabbitmq-stream-producer-connection-" );
6970 private final Lock coordinatorLock = new ReentrantLock ();
71+ private final boolean forceLeader ;
7072
7173 ProducersCoordinator (
7274 StreamEnvironment environment ,
7375 int maxProducersByClient ,
7476 int maxTrackingConsumersByClient ,
7577 Function <ClientConnectionType , String > connectionNamingStrategy ,
76- ClientFactory clientFactory ) {
78+ ClientFactory clientFactory ,
79+ boolean forceLeader ) {
7780 this .environment = environment ;
7881 this .clientFactory = clientFactory ;
7982 this .maxProducersByClient = maxProducersByClient ;
8083 this .maxTrackingConsumersByClient = maxTrackingConsumersByClient ;
8184 this .connectionNamingStrategy = connectionNamingStrategy ;
85+ this .forceLeader = forceLeader ;
8286 }
8387
8488 Runnable registerProducer (StreamProducer producer , String reference , String stream ) {
@@ -105,9 +109,10 @@ Runnable registerTrackingConsumer(StreamConsumer consumer) {
105109 }
106110
107111 private Runnable registerAgentTracker (AgentTracker tracker , String stream ) {
108- Client .Broker broker = getBrokerForProducer (stream );
112+ List <BrokerWrapper > candidates = findCandidateNodes (stream , this .forceLeader );
113+ Broker broker = pickBroker (candidates );
109114
110- addToManager (broker , tracker );
115+ addToManager (broker , candidates , tracker );
111116
112117 if (DEBUG ) {
113118 return () -> {
@@ -125,7 +130,7 @@ private Runnable registerAgentTracker(AgentTracker tracker, String stream) {
125130 }
126131 }
127132
128- private void addToManager (Broker node , AgentTracker tracker ) {
133+ private void addToManager (Broker node , List < BrokerWrapper > candidates , AgentTracker tracker ) {
129134 ClientParameters clientParameters =
130135 environment
131136 .clientParametersCopy ()
@@ -153,7 +158,8 @@ private void addToManager(Broker node, AgentTracker tracker) {
153158 if (pickedManager == null ) {
154159 String name = keyForNode (node );
155160 LOGGER .debug ("Trying to create producer manager on {}" , name );
156- pickedManager = new ClientProducersManager (node , this .clientFactory , clientParameters );
161+ pickedManager =
162+ new ClientProducersManager (node , candidates , this .clientFactory , clientParameters );
157163 LOGGER .debug ("Created producer manager on {}, id {}" , name , pickedManager .id );
158164 }
159165 try {
@@ -192,11 +198,12 @@ private void addToManager(Broker node, AgentTracker tracker) {
192198 }
193199 }
194200
195- private Client .Broker getBrokerForProducer (String stream ) {
201+ // package protected for testing
202+ List <BrokerWrapper > findCandidateNodes (String stream , boolean forceLeader ) {
196203 Map <String , Client .StreamMetadata > metadata =
197204 this .environment .locatorOperation (
198205 namedFunction (c -> c .metadata (stream ), "Candidate lookup to publish to '%s'" , stream ));
199- if (metadata .size () == 0 || metadata .get (stream ) == null ) {
206+ if (metadata .isEmpty () || metadata .get (stream ) == null ) {
200207 throw new StreamDoesNotExistException (stream );
201208 }
202209
@@ -210,17 +217,34 @@ private Client.Broker getBrokerForProducer(String stream) {
210217 }
211218 }
212219
220+ List <BrokerWrapper > candidates = new ArrayList <>();
213221 Client .Broker leader = streamMetadata .getLeader ();
214- if (leader == null ) {
222+ if (leader == null && forceLeader ) {
215223 throw new IllegalStateException ("Not leader available for stream " + stream );
216224 }
217- LOGGER .debug (
218- "Using client on {}:{} to publish to {}" , leader .getHost (), leader .getPort (), stream );
225+ candidates .add (new BrokerWrapper (leader , true ));
219226
220- return leader ;
227+ if (!forceLeader && streamMetadata .hasReplicas ()) {
228+ candidates .addAll (
229+ streamMetadata .getReplicas ().stream ()
230+ .map (b -> new BrokerWrapper (b , false ))
231+ .collect (toList ()));
232+ }
233+
234+ LOGGER .debug ("Candidates to publish to {}: {}" , stream , candidates );
235+
236+ return Collections .unmodifiableList (candidates );
237+ }
238+
239+ static Broker pickBroker (List <BrokerWrapper > candidates ) {
240+ return candidates .stream ()
241+ .filter (BrokerWrapper ::isLeader )
242+ .findFirst ()
243+ .map (BrokerWrapper ::broker )
244+ .orElseThrow (() -> new IllegalStateException ("Not leader available" ));
221245 }
222246
223- void close () {
247+ public void close () {
224248 Iterator <ClientProducersManager > iterator = this .managers .iterator ();
225249 while (iterator .hasNext ()) {
226250 ClientProducersManager manager = iterator .next ();
@@ -568,7 +592,10 @@ private class ClientProducersManager implements Comparable<ClientProducersManage
568592 private final AtomicBoolean closed = new AtomicBoolean (false );
569593
570594 private ClientProducersManager (
571- Broker targetNode , ClientFactory cf , Client .ClientParameters clientParameters ) {
595+ Broker targetNode ,
596+ List <BrokerWrapper > candidates ,
597+ ClientFactory cf ,
598+ Client .ClientParameters clientParameters ) {
572599 this .id = managerIdSequence .getAndIncrement ();
573600 AtomicReference <String > nameReference = new AtomicReference <>();
574601 AtomicReference <Client > ref = new AtomicReference <>();
@@ -682,7 +709,7 @@ private ClientProducersManager(
682709 .metadataListener (metadataListener )
683710 .clientProperty ("connection_name" , connectionName ),
684711 keyForNode (targetNode ),
685- Collections . emptyList ( ));
712+ candidates . stream (). map ( BrokerWrapper :: broker ). collect ( toList () ));
686713 this .client = cf .client (connectionFactoryContext );
687714 this .node = Utils .brokerFromClient (this .client );
688715 this .name = keyForNode (this .node );
@@ -694,18 +721,19 @@ private ClientProducersManager(
694721
695722 private void assignProducersToNewManagers (
696723 Collection <AgentTracker > trackers , String stream , BackOffDelayPolicy delayPolicy ) {
697- AsyncRetry .asyncRetry (() -> getBrokerForProducer (stream ))
724+ AsyncRetry .asyncRetry (() -> findCandidateNodes (stream , forceLeader ))
698725 .description ("Candidate lookup to publish to " + stream )
699726 .scheduler (environment .scheduledExecutorService ())
700727 .retry (ex -> !(ex instanceof StreamDoesNotExistException ))
701728 .delayPolicy (delayPolicy )
702729 .build ()
703730 .thenAccept (
704- broker -> {
731+ candidates -> {
732+ Broker broker = pickBroker (candidates );
705733 String key = keyForNode (broker );
706734 LOGGER .debug (
707735 "Assigning {} producer(s) and consumer tracker(s) to {}" , trackers .size (), key );
708- trackers .forEach (tracker -> maybeRecoverAgent (broker , tracker ));
736+ trackers .forEach (tracker -> maybeRecoverAgent (broker , candidates , tracker ));
709737 })
710738 .exceptionally (
711739 ex -> {
@@ -730,10 +758,11 @@ private void assignProducersToNewManagers(
730758 });
731759 }
732760
733- private void maybeRecoverAgent (Broker broker , AgentTracker tracker ) {
761+ private void maybeRecoverAgent (
762+ Broker broker , List <BrokerWrapper > candidates , AgentTracker tracker ) {
734763 if (tracker .markRecoveryInProgress ()) {
735764 try {
736- recoverAgent (broker , tracker );
765+ recoverAgent (broker , candidates , tracker );
737766 } catch (Exception e ) {
738767 LOGGER .warn (
739768 "Error while recovering {} tracker {} (stream '{}'). Reason: {}" ,
@@ -750,14 +779,14 @@ private void maybeRecoverAgent(Broker broker, AgentTracker tracker) {
750779 }
751780 }
752781
753- private void recoverAgent (Broker node , AgentTracker tracker ) {
782+ private void recoverAgent (Broker node , List < BrokerWrapper > candidates , AgentTracker tracker ) {
754783 boolean reassignmentCompleted = false ;
755784 while (!reassignmentCompleted ) {
756785 try {
757786 if (tracker .isOpen ()) {
758787 LOGGER .debug (
759788 "Using {} to resume {} to {}" , node .label (), tracker .type (), tracker .stream ());
760- addToManager (node , tracker );
789+ addToManager (node , candidates , tracker );
761790 tracker .running ();
762791 } else {
763792 LOGGER .debug (
@@ -776,14 +805,15 @@ private void recoverAgent(Broker node, AgentTracker tracker) {
776805 tracker .identifiable () ? tracker .id () : "N/A" ,
777806 tracker .stream ());
778807 // maybe not a good candidate, let's refresh and retry for this one
779- node =
808+ candidates =
780809 Utils .callAndMaybeRetry (
781- () -> getBrokerForProducer (tracker .stream ()),
810+ () -> findCandidateNodes (tracker .stream (), forceLeader ),
782811 ex -> !(ex instanceof StreamDoesNotExistException ),
783812 environment .recoveryBackOffDelayPolicy (),
784813 "Candidate lookup for %s on stream '%s'" ,
785814 tracker .type (),
786815 tracker .stream ());
816+ node = pickBroker (candidates );
787817 } catch (Exception e ) {
788818 LOGGER .warn (
789819 "Error while re-assigning {} (stream '{}')" , tracker .type (), tracker .stream (), e );
0 commit comments