Skip to content

Commit

Permalink
Harden gossip consensus (#1911)
Browse files Browse the repository at this point in the history
* test larger clusters
* remove redundant purge
  • Loading branch information
rogeralsing authored Jan 27, 2023
1 parent e7c2608 commit 8023a7a
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 11 deletions.
6 changes: 2 additions & 4 deletions src/Proto.Cluster/Gossip/Gossip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ public ImmutableList<GossipUpdate> ReceiveState(GossipState remoteState)
}

_state = newState;

//TODO: Optimize
Purge();

CheckConsensus(updatedKeys);

return updates.ToImmutableList();
Expand Down Expand Up @@ -253,7 +251,7 @@ private void Purge()
{
//find all members that have sent topology
var members = _getMembers();

foreach (var memberId in _state.Members.Keys.ToArray())
{
if (!members.Contains(memberId))
Expand Down
4 changes: 1 addition & 3 deletions src/Proto.Cluster/Gossip/GossipActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ public class GossipActor : IActor
private static readonly ILogger Logger = Log.CreateLogger<GossipActor>();
private readonly TimeSpan _gossipRequestTimeout;
private readonly IGossip _internal;
private readonly ActorSystem _system;

// lookup from state key -> consensus checks

Expand All @@ -29,11 +28,10 @@ public GossipActor(
int gossipMaxSend
)
{
_system = system;
_gossipRequestTimeout = gossipRequestTimeout;

_internal = new Gossip(myId, gossipFanout, gossipMaxSend, instanceLogger,
() => _system.Cluster().MemberList.GetMembers());
() => system.Cluster().MemberList.GetMembers());
}

public Task ReceiveAsync(IContext context) =>
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Cluster/Gossip/Gossiper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public async Task<ImmutableDictionary<string, GossipKeyValue>> GetStateEntry(str
try
{
var res = await _context.RequestAsync<GetGossipStateEntryResponse>(_pid,
new GetGossipStateEntryRequest(key));
new GetGossipStateEntryRequest(key),CancellationTokens.FromSeconds(5));

return res.State;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Cluster/GossipContracts.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ message GossipState {
//we still know when _we_ as in this node, got this data.
//and we can measure time from then til now.
//
//if we got a hear-beat from another node, and X seconds pass, we can assume it to be dead
//if we got a heartbeat from another node, and X seconds pass, we can assume it to be dead
message GossipKeyValue {
int64 sequence_number = 2; //version is local to the owner member
google.protobuf.Any value = 4; //value is any format
Expand Down
4 changes: 2 additions & 2 deletions tests/Proto.Cluster.Tests/GossipCoreTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public GossipCoreTests(ITestOutputHelper output)
[Fact]
public async Task Large_cluster_should_get_topology_consensus()
{
const int memberCount = 50;
const int fanout = 4;
const int memberCount = 300;
const int fanout = 3;

var members =
Enumerable
Expand Down

0 comments on commit 8023a7a

Please sign in to comment.