Skip to content

Commit

Permalink
fix: register SnapshotReplicationListener in raft context
Browse files Browse the repository at this point in the history
(cherry picked from commit be9f78f)
  • Loading branch information
lenaschoenburg authored and github-actions[bot] committed Feb 28, 2022
1 parent 7174005 commit 96ec2c8
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
32 changes: 21 additions & 11 deletions atomix/cluster/src/main/java/io/atomix/raft/impl/RaftContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -469,12 +469,15 @@ public long setCommitIndex(final long commitIndex) {
*/
public void addSnapshotReplicationListener(
final SnapshotReplicationListener snapshotReplicationListener) {
snapshotReplicationListeners.add(snapshotReplicationListener);
if (ongoingSnapshotReplication) {
// Notify listener immediately if it registered during an ongoing replication.
// This is to prevent missing necessary state transitions.
snapshotReplicationListener.onSnapshotReplicationStarted();
}
threadContext.execute(
() -> {
snapshotReplicationListeners.add(snapshotReplicationListener);
if (ongoingSnapshotReplication) {
// Notify listener immediately if it registered during an ongoing replication.
// This is to prevent missing necessary state transitions.
snapshotReplicationListener.onSnapshotReplicationStarted();
}
});
}

/**
Expand All @@ -484,17 +487,24 @@ public void addSnapshotReplicationListener(
*/
public void removeSnapshotReplicationListener(
final SnapshotReplicationListener snapshotReplicationListener) {
snapshotReplicationListeners.remove(snapshotReplicationListener);
threadContext.execute(() -> snapshotReplicationListeners.remove(snapshotReplicationListener));
}

public void notifySnapshotReplicationStarted() {
ongoingSnapshotReplication = true;
snapshotReplicationListeners.forEach(SnapshotReplicationListener::onSnapshotReplicationStarted);
threadContext.execute(
() -> {
ongoingSnapshotReplication = true;
snapshotReplicationListeners.forEach(
SnapshotReplicationListener::onSnapshotReplicationStarted);
});
}

public void notifySnapshotReplicationCompleted() {
snapshotReplicationListeners.forEach(l -> l.onSnapshotReplicationCompleted(term));
ongoingSnapshotReplication = false;
threadContext.execute(
() -> {
snapshotReplicationListeners.forEach(l -> l.onSnapshotReplicationCompleted(term));
ongoingSnapshotReplication = false;
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

Expand Down Expand Up @@ -68,8 +67,7 @@ public void shouldNotifyOnRegisteringListener() {
final var follower = raftRule.getFollower().orElseThrow();
// then
follower.getContext().notifySnapshotReplicationStarted();
verify(snapshotReplicationListener, never()).onSnapshotReplicationStarted();
follower.getContext().addSnapshotReplicationListener(snapshotReplicationListener);
verify(snapshotReplicationListener).onSnapshotReplicationStarted();
verify(snapshotReplicationListener, timeout(1_000).times(1)).onSnapshotReplicationStarted();
}
}

0 comments on commit 96ec2c8

Please sign in to comment.