Skip to content

Commit

Permalink
Synchronize view change and retransmit task
Browse files Browse the repository at this point in the history
  • Loading branch information
wburns committed Jan 16, 2024
1 parent 84bbaac commit 89708b4
Showing 1 changed file with 28 additions and 26 deletions.
54 changes: 28 additions & 26 deletions src/org/jgroups/protocols/UNICAST3.java
Original file line number Diff line number Diff line change
Expand Up @@ -619,33 +619,35 @@ public Object down(Event evt) {
switch (evt.getType()) {

case Event.VIEW_CHANGE: // remove connections to peers that are not members anymore !
View view=evt.getArg();
List<Address> new_members=view.getMembers();
Set<Address> non_members=new HashSet<>(send_table.keySet());
non_members.addAll(recv_table.keySet());
members=new_members;
new_members.forEach(non_members::remove);
if(cache != null)
cache.removeAll(new_members);

if(!non_members.isEmpty()) {
log.trace("%s: closing connections of non members %s", local_addr, non_members);
// remove all non members, except those from remote sites: https://issues.redhat.com/browse/JGRP-2729
non_members.stream().filter(this::isLocal).forEach(this::closeConnection);
}
if(!new_members.isEmpty()) {
for(Address mbr: new_members) {
Entry e=send_table.get(mbr);
if(e != null && e.state() == State.CLOSING)
e.state(State.OPEN);
e=recv_table.get(mbr);
if(e != null && e.state() == State.CLOSING)
e.state(State.OPEN);
synchronized (this) {
View view = evt.getArg();
List<Address> new_members = view.getMembers();
Set<Address> non_members = new HashSet<>(send_table.keySet());
non_members.addAll(recv_table.keySet());
members = new_members;
new_members.forEach(non_members::remove);
if (cache != null)
cache.removeAll(new_members);

if (!non_members.isEmpty()) {
log.trace("%s: closing connections of non members %s", local_addr, non_members);
// remove all non members, except those from remote sites: https://issues.redhat.com/browse/JGRP-2729
non_members.stream().filter(this::isLocal).forEach(this::closeConnection);
}
if (!new_members.isEmpty()) {
for (Address mbr : new_members) {
Entry e = send_table.get(mbr);
if (e != null && e.state() == State.CLOSING)
e.state(State.OPEN);
e = recv_table.get(mbr);
if (e != null && e.state() == State.CLOSING)
e.state(State.OPEN);
}
}
xmit_task_map.keySet().retainAll(new_members);
last_sync_sent.removeExpiredElements();
break;
}
xmit_task_map.keySet().retainAll(new_members);
last_sync_sent.removeExpiredElements();
break;
}

return down_prot.down(evt); // Pass on to the layer below us
Expand Down Expand Up @@ -1328,7 +1330,7 @@ public int removeConnections(boolean remove_send_connections, boolean remove_rec
}

@ManagedOperation(description="Triggers the retransmission task")
public void triggerXmit() {
public synchronized void triggerXmit() {
// check for gaps in the received messages and ask senders to send them again
for(Map.Entry<Address,ReceiverEntry> entry: recv_table.entrySet()) {
Address target=entry.getKey(); // target to send retransmit requests to
Expand Down

0 comments on commit 89708b4

Please sign in to comment.