diff --git a/src/org/jgroups/protocols/UNICAST3.java b/src/org/jgroups/protocols/UNICAST3.java index df4d8e9a25..352897a00e 100644 --- a/src/org/jgroups/protocols/UNICAST3.java +++ b/src/org/jgroups/protocols/UNICAST3.java @@ -619,33 +619,35 @@ public Object down(Event evt) { switch (evt.getType()) { case Event.VIEW_CHANGE: // remove connections to peers that are not members anymore ! - View view=evt.getArg(); - List
new_members=view.getMembers(); - Set
non_members=new HashSet<>(send_table.keySet()); - non_members.addAll(recv_table.keySet()); - members=new_members; - new_members.forEach(non_members::remove); - if(cache != null) - cache.removeAll(new_members); - - if(!non_members.isEmpty()) { - log.trace("%s: closing connections of non members %s", local_addr, non_members); - // remove all non members, except those from remote sites: https://issues.redhat.com/browse/JGRP-2729 - non_members.stream().filter(this::isLocal).forEach(this::closeConnection); - } - if(!new_members.isEmpty()) { - for(Address mbr: new_members) { - Entry e=send_table.get(mbr); - if(e != null && e.state() == State.CLOSING) - e.state(State.OPEN); - e=recv_table.get(mbr); - if(e != null && e.state() == State.CLOSING) - e.state(State.OPEN); + synchronized (this) { + View view = evt.getArg(); + List
new_members = view.getMembers(); + Set
non_members = new HashSet<>(send_table.keySet()); + non_members.addAll(recv_table.keySet()); + members = new_members; + new_members.forEach(non_members::remove); + if (cache != null) + cache.removeAll(new_members); + + if (!non_members.isEmpty()) { + log.trace("%s: closing connections of non members %s", local_addr, non_members); + // remove all non members, except those from remote sites: https://issues.redhat.com/browse/JGRP-2729 + non_members.stream().filter(this::isLocal).forEach(this::closeConnection); + } + if (!new_members.isEmpty()) { + for (Address mbr : new_members) { + Entry e = send_table.get(mbr); + if (e != null && e.state() == State.CLOSING) + e.state(State.OPEN); + e = recv_table.get(mbr); + if (e != null && e.state() == State.CLOSING) + e.state(State.OPEN); + } } + xmit_task_map.keySet().retainAll(new_members); + last_sync_sent.removeExpiredElements(); + break; } - xmit_task_map.keySet().retainAll(new_members); - last_sync_sent.removeExpiredElements(); - break; } return down_prot.down(evt); // Pass on to the layer below us @@ -1328,7 +1330,7 @@ public int removeConnections(boolean remove_send_connections, boolean remove_rec } @ManagedOperation(description="Triggers the retransmission task") - public void triggerXmit() { + public synchronized void triggerXmit() { // check for gaps in the received messages and ask senders to send them again for(Map.Entry entry: recv_table.entrySet()) { Address target=entry.getKey(); // target to send retransmit requests to