@@ -93,6 +93,11 @@ public void onFailure(Exception e) {
9393 logger .warn ("failed to send back failure on join request" , inner );
9494 }
9595 }
96+
97+ @ Override
98+ public String toString () {
99+ return "JoinCallback{request=" + request + "}" ;
100+ }
96101 }));
97102 }
98103
@@ -115,8 +120,10 @@ public void clearAndFailPendingJoins(String reason) {
115120
116121 public void clearAndSubmitPendingJoins () {
117122 final Map <JoinTaskExecutor .Task , ClusterStateTaskListener > pendingAsTasks = new HashMap <>();
118- joinRequestAccumulator .forEach ((key , value ) -> pendingAsTasks .put (new JoinTaskExecutor .Task (key , "elect leader" ),
119- new JoinTaskListener (value )));
123+ joinRequestAccumulator .forEach ((key , value ) -> {
124+ final JoinTaskExecutor .Task task = new JoinTaskExecutor .Task (key , "elect leader" );
125+ pendingAsTasks .put (task , new JoinTaskListener (task , value ));
126+ });
120127 joinRequestAccumulator .clear ();
121128
122129 pendingAsTasks .put (JoinTaskExecutor .BECOME_MASTER_TASK , (source , e ) -> {});
@@ -127,9 +134,9 @@ public void clearAndSubmitPendingJoins() {
127134
128135 public void joinLeader (JoinRequest joinRequest , JoinCallback joinCallback ) {
129136 // submit as cluster state update task
130- masterService . submitStateUpdateTask ( "node- join" ,
131- new JoinTaskExecutor . Task ( joinRequest . getSourceNode (), " join existing leader" ) , ClusterStateTaskConfig .build (Priority .URGENT ),
132- joinTaskExecutor , new JoinTaskListener (joinCallback ));
137+ final JoinTaskExecutor . Task task = new JoinTaskExecutor . Task ( joinRequest . getSourceNode (), " join existing leader" );
138+ masterService . submitStateUpdateTask ( "node- join" , task , ClusterStateTaskConfig .build (Priority .URGENT ),
139+ joinTaskExecutor , new JoinTaskListener (task , joinCallback ));
133140 }
134141
135142 public interface JoinCallback {
@@ -139,9 +146,11 @@ public interface JoinCallback {
139146 }
140147
141148 static class JoinTaskListener implements ClusterStateTaskListener {
149+ private final JoinTaskExecutor .Task task ;
142150 private final JoinCallback joinCallback ;
143151
144- JoinTaskListener (JoinCallback joinCallback ) {
152+ JoinTaskListener (JoinTaskExecutor .Task task , JoinCallback joinCallback ) {
153+ this .task = task ;
145154 this .joinCallback = joinCallback ;
146155 }
147156
@@ -154,6 +163,11 @@ public void onFailure(String source, Exception e) {
154163 public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
155164 joinCallback .onSuccess ();
156165 }
166+
167+ @ Override
168+ public String toString () {
169+ return "JoinTaskListener{task=" + task + "}" ;
170+ }
157171 }
158172
159173}
0 commit comments