Skip to content

Commit

Permalink
Merge pull request #16 from Syndica/19/prune_msgs
Browse files Browse the repository at this point in the history
gossip: Prune Messages
  • Loading branch information
ultd committed Aug 18, 2023
2 parents 5ff3020 + ac31b93 commit 85ed1c4
Show file tree
Hide file tree
Showing 23 changed files with 827 additions and 383 deletions.
83 changes: 83 additions & 0 deletions remove_unused.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# parse arg of file name
import sys
import os

if len(sys.argv) != 2:
print("Usage: python remove_unused.py <dir name>")
sys.exit()

zig_files = []
dirs = [sys.argv[1]]
while 1:
d = dirs.pop()
files = os.listdir(d)
for file in files:
full_path = os.path.join(d, file)
if os.path.isdir(full_path):
dirs.append(full_path)
else:
# if file ends in .zig
if file.endswith('.zig'):
zig_files.append(full_path)

if len(dirs) == 0:
break

n_remove_iter = 0
n_removes = 1
while n_removes > 0:
n_removes = 0
print(f"iteration: {n_remove_iter}, lines removed: {n_removes}")
n_remove_iter += 1

for filename in zig_files:
print(filename)

# open and read lines of file
with open(filename, 'r') as f:
full_lines = f.readlines()

# filter lines to start with 'const' or 'pub const'
lines = [line for line in full_lines if line.startswith('const') or line.startswith('pub const')]

# get lines which include '@import'
lines = [line for line in lines if '@import' in line]

# parse the value {VAR} name in 'const {VAR} = @import ...'
import_var_names = []
for (i, line) in enumerate(full_lines):
if not (line.startswith('const') or line.startswith('pub const')):
continue

if '@import' not in line:
continue

start_index = line.index("const ")
end_index = line.index(" = ")
var_name = line[start_index + 6:end_index]
import_var_names.append((var_name, i))

unused_vars = import_var_names.copy()
for i, line in enumerate(full_lines):

for var, line_num in import_var_names:
if (var in line) and (i != line_num):
if (var, line_num) in unused_vars:
unused_vars.remove((var, line_num))

new_lines = []
lines_to_remove = [i for (_, i) in unused_vars]
n_removes += len(lines_to_remove)

for (i, line) in enumerate(full_lines):
if i in lines_to_remove:
continue
new_lines.append(line)

print(unused_vars)

# write
with open(filename, 'w') as f:
f.writelines(new_lines)


4 changes: 2 additions & 2 deletions src/core/pubkey.zig
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ pub const Pubkey = struct {
};

var written = encoder.encode(bytes, &dest) catch @panic("could not encode pubkey");
if (written != 44) {
@panic("written is not 44");
if (written > 44) {
std.debug.panic("written is > 44, written: {}, dest: {any}, bytes: {any}", .{ written, dest, bytes });
}
return Self{ .data = bytes[0..32].*, .cached_str = dest[0..44].* };
}
Expand Down
2 changes: 0 additions & 2 deletions src/core/shred.zig
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
const std = @import("std");

/// ***ShredVersion***
/// Currently it's being manually set.
///
Expand Down
172 changes: 119 additions & 53 deletions src/gossip/active_set.zig
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
const std = @import("std");
const ClusterInfo = @import("cluster_info.zig").ClusterInfo;
const network = @import("zig-network");
const EndPoint = network.EndPoint;
const Packet = @import("packet.zig").Packet;
const PACKET_DATA_SIZE = @import("packet.zig").PACKET_DATA_SIZE;
const Channel = @import("../sync/channel.zig").Channel;
const Thread = std.Thread;
const AtomicBool = std.atomic.Atomic(bool);
const UdpSocket = network.Socket;
const Tuple = std.meta.Tuple;
const SocketAddr = @import("net.zig").SocketAddr;
const Protocol = @import("protocol.zig").Protocol;
const Ping = @import("ping_pong.zig").Ping;
const bincode = @import("../bincode/bincode.zig");
const crds = @import("../gossip/crds.zig");
const CrdsValue = crds.CrdsValue;

Expand All @@ -22,93 +14,167 @@ const get_wallclock = @import("../gossip/crds.zig").get_wallclock;

const _crds_table = @import("../gossip/crds_table.zig");
const CrdsTable = _crds_table.CrdsTable;
const CrdsError = _crds_table.CrdsError;
const Logger = @import("../trace/log.zig").Logger;
const GossipRoute = _crds_table.GossipRoute;

const pull_request = @import("../gossip/pull_request.zig");
const CrdsFilter = pull_request.CrdsFilter;
const MAX_NUM_PULL_REQUESTS = pull_request.MAX_NUM_PULL_REQUESTS;

const pull_response = @import("../gossip/pull_response.zig");
const GossipService = @import("../gossip/gossip_service.zig").GossipService;

var gpa_allocator = std.heap.GeneralPurposeAllocator(.{}){};
var gpa = gpa_allocator.allocator();

const PacketChannel = Channel(Packet);
const ProtocolMessage = struct { from_addr: EndPoint, message: Protocol };
const ProtocolChannel = Channel(ProtocolMessage);

const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000;
const FAILED_INSERTS_RETENTION_MS: u64 = 20_000;
const MAX_BYTES_PER_PUSH: u64 = PACKET_DATA_SIZE * 64;
const PUSH_MESSAGE_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 44;

const GOSSIP_SLEEP_MILLIS: u64 = 100;
const Bloom = @import("../bloom/bloom.zig").Bloom;

const NUM_ACTIVE_SET_ENTRIES: usize = 25;
const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;

const MIN_NUM_BLOOM_ITEMS: usize = 512;
const BLOOM_FALSE_RATE: f64 = 0.1;
const BLOOM_MAX_BITS: usize = 1024 * 8 * 4;

pub const ActiveSet = struct {
// store pubkeys as keys in crds table bc the data can change
peers: [NUM_ACTIVE_SET_ENTRIES]Pubkey,
len: u8,
pruned_peers: std.AutoHashMap(Pubkey, Bloom),
len: u8 = 0,

const Self = @This();

pub fn init() Self {
pub fn rotate(
alloc: std.mem.Allocator,
crds_table: *CrdsTable,
my_pubkey: Pubkey,
my_shred_version: u16,
) !Self {
const now = get_wallclock();
var buf: [NUM_ACTIVE_SET_ENTRIES]crds.LegacyContactInfo = undefined;
var crds_peers = try GossipService.get_gossip_nodes(
crds_table,
&my_pubkey,
my_shred_version,
&buf,
NUM_ACTIVE_SET_ENTRIES,
now,
);

var peers: [NUM_ACTIVE_SET_ENTRIES]Pubkey = undefined;
var pruned_peers = std.AutoHashMap(Pubkey, Bloom).init(alloc);

if (crds_peers.len == 0) {
return Self{ .peers = peers, .len = 0, .pruned_peers = pruned_peers };
}

const size = @min(crds_peers.len, NUM_ACTIVE_SET_ENTRIES);
var rng = std.rand.DefaultPrng.init(get_wallclock());
pull_request.shuffle_first_n(rng.random(), crds.LegacyContactInfo, crds_peers, size);

const bloom_num_items = @max(crds_peers.len, MIN_NUM_BLOOM_ITEMS);
for (0..size) |i| {
peers[i] = crds_peers[i].id;

// *full* hard restart on blooms -- labs doesnt do this - bug?
var bloom = try Bloom.random(
alloc,
bloom_num_items,
BLOOM_FALSE_RATE,
BLOOM_MAX_BITS,
);
try pruned_peers.put(peers[i], bloom);
}

return Self{
.peers = undefined,
.len = 0,
.peers = peers,
.len = size,
.pruned_peers = pruned_peers,
};
}

pub fn deinit(self: *Self) void {
for (self.peers[0..self.len]) |peer| {
var entry = self.pruned_peers.getEntry(peer).?;
entry.value_ptr.deinit();
}
self.pruned_peers.deinit();
}

pub fn prune(self: *Self, from: Pubkey, origin: Pubkey) void {
if (self.pruned_peers.getEntry(from)) |entry| {
const origin_bytes = origin.data;
entry.value_ptr.add(&origin_bytes);
}
}

pub fn get_fanout_peers(
self: *const Self,
allocator: std.mem.Allocator,
origin: Pubkey,
crds_table: *CrdsTable,
) !std.ArrayList(EndPoint) {
var active_set_endpoints = std.ArrayList(EndPoint).init(allocator);
errdefer active_set_endpoints.deinit();

// change to while loop
crds_table.read();
errdefer crds_table.release_read();

for (self.peers[0..self.len]) |peer_pubkey| {
const peer_info = crds_table.get(crds.CrdsValueLabel{
.LegacyContactInfo = peer_pubkey,
}).?;
const peer_gossip_addr = peer_info.value.data.LegacyContactInfo.gossip;

crds.sanitize_socket(&peer_gossip_addr) catch continue;
try active_set_endpoints.append(peer_gossip_addr.toEndpoint());

const entry = self.pruned_peers.getEntry(peer_pubkey).?;
const origin_bytes = origin.data;
if (entry.value_ptr.contains(&origin_bytes)) {
continue;
}

try active_set_endpoints.append(peer_gossip_addr.toEndpoint());
if (active_set_endpoints.items.len == CRDS_GOSSIP_PUSH_FANOUT) {
break;
}
}
crds_table.release_read();

return active_set_endpoints;
}
};

pub fn reset(self: *Self, crds_table: *CrdsTable, my_pubkey: Pubkey, my_shred_version: u16) !void {
const now = get_wallclock();
var buf: [NUM_ACTIVE_SET_ENTRIES]crds.LegacyContactInfo = undefined;
var crds_peers = try GossipService.get_gossip_nodes(
crds_table,
&my_pubkey,
my_shred_version,
&buf,
NUM_ACTIVE_SET_ENTRIES,
now,
);
test "gossip.active_set: init/deinit" {
var alloc = std.testing.allocator;

const size = @min(crds_peers.len, NUM_ACTIVE_SET_ENTRIES);
var rng = std.rand.DefaultPrng.init(get_wallclock());
pull_request.shuffle_first_n(rng.random(), crds.LegacyContactInfo, crds_peers, size);
var crds_table = try CrdsTable.init(alloc);
defer crds_table.deinit();

for (0..size) |i| {
self.peers[i] = crds_peers[i].id;
}
self.len = size;
// insert some contacts
var rng = std.rand.DefaultPrng.init(100);
for (0..CRDS_GOSSIP_PUSH_FANOUT) |_| {
var keypair = try KeyPair.create(null);
var value = try CrdsValue.random_with_index(rng.random(), keypair, 0);
try crds_table.insert(value, get_wallclock());
}
};

var kp = try KeyPair.create(null);
var pk = Pubkey.fromPublicKey(&kp.public_key, true);
var active_set = try ActiveSet.rotate(
alloc,
&crds_table,
pk,
0,
);
defer active_set.deinit();

try std.testing.expect(active_set.len == CRDS_GOSSIP_PUSH_FANOUT);

const origin = Pubkey.random(rng.random(), .{});

var fanout = try active_set.get_fanout_peers(alloc, origin, &crds_table);
defer fanout.deinit();
const no_prune_fanout_len = fanout.items.len;
try std.testing.expect(no_prune_fanout_len > 0);

const peer_pubkey = active_set.peers[0];
active_set.prune(peer_pubkey, origin);

var fanout_with_prune = try active_set.get_fanout_peers(alloc, origin, &crds_table);
defer fanout_with_prune.deinit();
try std.testing.expectEqual(no_prune_fanout_len, fanout_with_prune.items.len + 1);
}
4 changes: 0 additions & 4 deletions src/gossip/cluster_info.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ const LegacyContactInfo = @import("crds.zig").LegacyContactInfo;
const NodeInstance = @import("crds.zig").NodeInstance;
const KeyPair = std.crypto.sign.Ed25519.KeyPair;
const ArrayList = std.ArrayList;
const Channel = @import("../sync/channel.zig").Channel;
const Packet = @import("packet.zig").Packet;
const Protocol = @import("protocol.zig").Protocol;
const bincode = @import("../bincode/bincode.zig");
const CrdsValue = @import("crds.zig").CrdsValue;
const CrdsData = @import("crds.zig").CrdsData;
const Version = @import("crds.zig").Version;
Expand Down
1 change: 0 additions & 1 deletion src/gossip/cmd.zig
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const std = @import("std");
const GossipService = @import("gossip_service.zig").GossipService;
const ClusterInfo = @import("cluster_info.zig").ClusterInfo;
const network = @import("zig-network");
const Keypair = std.crypto.sign.Ed25519.KeyPair;
const SecretKey = std.crypto.sign.Ed25519.SecretKey;
const AtomicBool = std.atomic.Atomic(bool);
Expand Down
4 changes: 0 additions & 4 deletions src/gossip/crds.zig
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ const Option = @import("../option.zig").Option;
const ContactInfo = @import("node.zig").ContactInfo;
const bincode = @import("../bincode/bincode.zig");
const ArrayList = std.ArrayList;
const ArrayListConfig = @import("../utils/arraylist.zig").ArrayListConfig;
const Bloom = @import("../bloom/bloom.zig").Bloom;
const KeyPair = std.crypto.sign.Ed25519.KeyPair;
const Pubkey = @import("../core/pubkey.zig").Pubkey;
const sanitize_wallclock = @import("./protocol.zig").sanitize_wallclock;
Expand All @@ -30,8 +28,6 @@ pub const CrdsVersionedValue = struct {
value_hash: Hash,
timestamp_on_insertion: u64,
cursor_on_insertion: u64,
/// Number of times duplicates of this value are recevied from gossip push.
num_push_dups: u8,
};

pub const CrdsValue = struct {
Expand Down
Loading

0 comments on commit 85ed1c4

Please sign in to comment.