Skip to content

Commit

Permalink
handle push messages logic
Browse files Browse the repository at this point in the history
  • Loading branch information
0xNineteen committed Jul 23, 2023
1 parent bd2d09b commit 0af8f42
Showing 1 changed file with 34 additions and 1 deletion.
35 changes: 34 additions & 1 deletion src/gossip/gossip_service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const Protocol = @import("protocol.zig").Protocol;
const Ping = @import("protocol.zig").Ping;
const bincode = @import("bincode-zig");
const crds = @import("../gossip/crds.zig");
const CrdsTable = @import("../gossip/crds_table.zig").CrdsTable;
const Logger = @import("../trace/log.zig").Logger;

var gpa_allocator = std.heap.GeneralPurposeAllocator(.{}){};
Expand All @@ -22,12 +23,19 @@ var gpa = gpa_allocator.allocator();
const PacketChannel = Channel(Packet);
// const ProtocolChannel = Channel(Protocol);

const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000;

pub fn get_wallclock() u64 {
return @intCast(std.time.milliTimestamp());
}

pub const GossipService = struct {
cluster_info: *ClusterInfo,
gossip_socket: UdpSocket,
exit_sig: AtomicBool,
packet_channel: PacketChannel,
responder_channel: PacketChannel,
crds_table: CrdsTable,

const Self = @This();

Expand All @@ -39,19 +47,22 @@ pub const GossipService = struct {
) Self {
var packet_channel = PacketChannel.init(allocator, 10000);
var responder_channel = PacketChannel.init(allocator, 10000);
var crds_table = CrdsTable.init(allocator);

return Self{
.cluster_info = cluster_info,
.gossip_socket = gossip_socket,
.exit_sig = exit,
.packet_channel = packet_channel,
.responder_channel = responder_channel,
.crds_table = crds_table,
};
}

pub fn deinit(self: *Self) void {
self.packet_channel.deinit();
self.responder_channel.deinit();
self.crds_table.deinit();
}

pub fn run(self: *Self, logger: *Logger) !void {
Expand Down Expand Up @@ -105,7 +116,7 @@ pub const GossipService = struct {
const gossip_endpoint = try self.gossip_socket.getLocalEndPoint();
const gossip_addr = SocketAddr.init_ipv4(gossip_endpoint.address.ipv4.value, gossip_endpoint.port);
const unspecified_addr = SocketAddr.init_ipv4(.{ 0, 0, 0, 0 }, 0);
const wallclock = @as(u64, @intCast(std.time.milliTimestamp()));
const wallclock = get_wallclock();

var legacy_contact_info = crds.LegacyContactInfo{
.id = id,
Expand Down Expand Up @@ -192,10 +203,32 @@ pub const GossipService = struct {
logger.debugf("ping message verification failed...", .{});
}
},
.PushMessage => |*push| {
logger.debugf("got a push message: {any}", .{protocol_message});
const values = push[1];
handle_push_message(&self.crds_table, values, logger);
},
else => {
logger.debugf("got a protocol message: {any}", .{protocol_message});
},
}
}
}

pub fn handle_push_message(crds_table: *CrdsTable, values: []crds.CrdsValue, logger: *Logger) void {
var now = get_wallclock();

for (values) |value| {
const value_time = value.wallclock();
const is_too_new = value_time > now +| CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS;
const is_too_old = value_time < now -| CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS;
if (is_too_new or is_too_old) {
continue;
}

crds_table.insert(value, now) catch {
logger.debugf("failed to insert into crds: {any}", .{value});
};
}
}
};

0 comments on commit 0af8f42

Please sign in to comment.