Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gossip: triming crds table values #18

Merged
merged 18 commits into from
Aug 23, 2023
Merged
25 changes: 19 additions & 6 deletions src/bincode/bincode.zig
Original file line number Diff line number Diff line change
Expand Up @@ -522,28 +522,36 @@ pub inline fn shouldUseDefaultValue(comptime field: std.builtin.Type.StructField
pub fn writeToSlice(slice: []u8, data: anytype, params: Params) ![]u8 {
var stream = std.io.fixedBufferStream(slice);
var writer = stream.writer();

var s = serializer(writer, params);
const ss = s.serializer();

try getty.serialize(null, data, ss);
return stream.getWritten();
}

pub fn writeToArray(alloc: std.mem.Allocator, data: anytype, params: Params) !std.ArrayList(u8) {
// var array_buf = try std.ArrayList(u8).initCapacity(alloc, @bitSizeOf(@TypeOf(data)));
var array_buf = try std.ArrayList(u8).initCapacity(alloc, 2048);

var s = serializer(array_buf.writer(), params);
const ss = s.serializer();

try getty.serialize(alloc, data, ss);

return array_buf;
}

pub fn write(alloc: ?std.mem.Allocator, writer: anytype, data: anytype, params: Params) !void {
var s = serializer(writer, params);
const ss = s.serializer();
try getty.serialize(alloc, data, ss);
}

pub fn get_serialized_size(alloc: std.mem.Allocator, data: anytype, params: Params) !usize {
var list = try std.ArrayList(u8).initCapacity(alloc, @bitSizeOf(@TypeOf(data)));
var list = try writeToArray(alloc, data, params);
defer list.deinit();

var writer = list.writer();
var s = serializer(writer, params);
const ss = s.serializer();
try getty.serialize(alloc, data, ss);

return list.items.len;
}

Expand Down Expand Up @@ -705,6 +713,11 @@ test "bincode: test serialization" {
var out4 = try writeToSlice(&buf6, _s, Params.standard);
var result = try readFromSlice(null, s, out4, Params{});
try std.testing.expectEqual(result, _s);

// ensure write to array works too
var array_buf = try writeToArray(std.testing.allocator, _s, Params.standard);
defer array_buf.deinit();
try std.testing.expectEqualSlices(u8, out4, array_buf.items);
}

test "bincode: (legacy) serialize an array" {
Expand Down
6 changes: 3 additions & 3 deletions src/gossip/crds_shards.zig
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub const CrdsShards = struct {
try shard.put(crds_index, uhash);
}

pub fn remove(self: *Self, crds_index: usize, hash: *const Hash) !void {
pub fn remove(self: *Self, crds_index: usize, hash: *const Hash) void {
const uhash = CrdsPull.hash_to_u64(hash);
const shard_index = CrdsShards.compute_shard_index(self.shard_bits, uhash);
const shard = &self.shards[shard_index];
Expand Down Expand Up @@ -118,7 +118,7 @@ test "gossip.crds_shards: tests CrdsShards" {

const v = Hash.random();
try shards.insert(10, &v);
try shards.remove(10, &v);
shards.remove(10, &v);

const result = try shards.find(std.testing.allocator, 20, 10);
defer result.deinit();
Expand Down Expand Up @@ -168,7 +168,7 @@ test "gossip.crds_shards: test shard find" {
var rand = std.rand.DefaultPrng.init(seed);
const rng = rand.random();

while (values.items.len < 1000) {
while (values.items.len < 50) {
const value = try new_test_crds_value(rng, &crds_table);
try values.append(value);
}
Expand Down
152 changes: 140 additions & 12 deletions src/gossip/crds_table.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ const AutoHashMap = std.AutoHashMap;

const bincode = @import("../bincode/bincode.zig");

const hash = @import("../core/hash.zig");
const Hash = hash.Hash;
const CompareResult = hash.CompareResult;
const _hash = @import("../core/hash.zig");
const Hash = _hash.Hash;
const CompareResult = _hash.CompareResult;

const CrdsShards = @import("./crds_shards.zig").CrdsShards;

Expand All @@ -24,6 +24,8 @@ const Pubkey = @import("../core/pubkey.zig").Pubkey;
const KeyPair = std.crypto.sign.Ed25519.KeyPair;
const RwLock = std.Thread.RwLock;

pub const CRDS_UNIQUE_PUBKEY_CAPACITY: usize = 8192;
0xNineteen marked this conversation as resolved.
Show resolved Hide resolved

pub const CrdsError = error{
OldValue,
DuplicateValue,
Expand All @@ -33,6 +35,11 @@ pub const HashAndTime = struct { hash: Hash, timestamp: u64 };
// TODO: benchmark other structs?
const PurgedQ = std.TailQueue(HashAndTime);

// indexable HashSet
pub fn AutoArrayHashSet(comptime T: type) type {
return AutoArrayHashMap(T, void);
}

/// Cluster Replicated Data Store: stores gossip data
/// the self.store uses an AutoArrayHashMap which is a HashMap that also allows for
/// indexing values (value = arrayhashmap[0]). This allows us to insert data
Expand All @@ -49,7 +56,7 @@ pub const CrdsTable = struct {
store: AutoArrayHashMap(CrdsValueLabel, CrdsVersionedValue),

// special types tracked with their index
contact_infos: AutoArrayHashMap(usize, void), // hashset for O(1) insertion/removal
contact_infos: AutoArrayHashSet(usize),
votes: AutoArrayHashMap(usize, usize),
epoch_slots: AutoArrayHashMap(usize, usize),
duplicate_shreds: AutoArrayHashMap(usize, usize),
Expand All @@ -58,6 +65,9 @@ pub const CrdsTable = struct {
// tracking for cursor to index
entries: AutoArrayHashMap(u64, usize),

// Indices of all crds values associated with a node/pubkey.
pubkey_to_values: AutoArrayHashMap(Pubkey, AutoArrayHashSet(usize)),

// used to build pull responses efficiently
shards: CrdsShards,

Expand All @@ -70,19 +80,23 @@ pub const CrdsTable = struct {
// thread safe
lock: RwLock = .{},

allocator: std.mem.Allocator,

const Self = @This();

pub fn init(allocator: std.mem.Allocator) !Self {
return Self{
.store = AutoArrayHashMap(CrdsValueLabel, CrdsVersionedValue).init(allocator),
.contact_infos = AutoArrayHashMap(usize, void).init(allocator),
.contact_infos = AutoArrayHashSet(usize).init(allocator),
.shred_versions = AutoHashMap(Pubkey, u16).init(allocator),
.votes = AutoArrayHashMap(usize, usize).init(allocator),
.epoch_slots = AutoArrayHashMap(usize, usize).init(allocator),
.duplicate_shreds = AutoArrayHashMap(usize, usize).init(allocator),
.entries = AutoArrayHashMap(u64, usize).init(allocator),
.pubkey_to_values = AutoArrayHashMap(Pubkey, AutoArrayHashSet(usize)).init(allocator),
.shards = try CrdsShards.init(allocator),
.purged = HashTimeQueue.init(),
.allocator = allocator,
};
}

Expand All @@ -93,8 +107,14 @@ pub const CrdsTable = struct {
self.votes.deinit();
self.epoch_slots.deinit();
self.duplicate_shreds.deinit();
self.shards.deinit();
self.entries.deinit();
self.shards.deinit();

var iter = self.pubkey_to_values.iterator();
while (iter.next()) |entry| {
entry.value_ptr.deinit();
}
self.pubkey_to_values.deinit();
}

pub fn write(self: *Self) void {
Expand All @@ -118,11 +138,9 @@ pub const CrdsTable = struct {
}

pub fn insert(self: *Self, value: CrdsValue, now: u64) !void {
// TODO: check to make sure this sizing is correct or use heap

var buf = [_]u8{0} ** 2048; // does this break if its called in parallel? / dangle?
var bytes = try bincode.writeToSlice(&buf, value, bincode.Params.standard);
const value_hash = Hash.generateSha256Hash(bytes);
const bytes = try bincode.writeToArray(self.allocator, value, bincode.Params.standard);
0xNineteen marked this conversation as resolved.
Show resolved Hide resolved
const value_hash = Hash.generateSha256Hash(bytes.items);
bytes.deinit();
const versioned_value = CrdsVersionedValue{
.value = value,
.value_hash = value_hash,
Expand All @@ -133,6 +151,7 @@ pub const CrdsTable = struct {
const label = value.label();
var result = try self.store.getOrPut(label);
const entry_index = result.index;
const origin = value.id();

// entry doesnt exist
if (!result.found_existing) {
Expand All @@ -157,6 +176,15 @@ pub const CrdsTable = struct {

try self.entries.put(self.cursor, entry_index);

const maybe_node_entry = self.pubkey_to_values.getEntry(origin);
if (maybe_node_entry) |node_entry| {
try node_entry.value_ptr.put(entry_index, {});
} else {
var indexs = AutoArrayHashSet(usize).init(self.allocator);
try indexs.put(entry_index, {});
try self.pubkey_to_values.put(origin, indexs);
}

result.value_ptr.* = versioned_value;

self.cursor += 1;
Expand Down Expand Up @@ -189,13 +217,17 @@ pub const CrdsTable = struct {

// remove and insert to make sure the shard ordering is oldest-to-newest
// NOTE: do we need the ordering to be oldest-to-newest?
try self.shards.remove(entry_index, &old_entry.value_hash);
self.shards.remove(entry_index, &old_entry.value_hash);
try self.shards.insert(entry_index, &versioned_value.value_hash);

const did_remove = self.entries.swapRemove(old_entry.cursor_on_insertion);
std.debug.assert(did_remove);
try self.entries.put(self.cursor, entry_index);

// As long as the pubkey does not change, self.records
// does not need to be updated.
std.debug.assert(old_entry.value.id().equals(&origin));

self.purged.insert(old_entry.value_hash, now);

result.value_ptr.* = versioned_value;
Expand Down Expand Up @@ -367,6 +399,78 @@ pub const CrdsTable = struct {
const indexs = try self.shards.find(alloc, mask, @intCast(mask_bits));
return indexs;
}

// ** triming values in the crdstable **
pub fn attempt_trim(self: *Self, max_pubkey_capacity: usize) void {
const n_pubkeys = self.pubkey_to_values.keys().len;
// 90% close to capacity
const should_trim = 10 * n_pubkeys > 11 * max_pubkey_capacity;
if (!should_trim) return;

const drop_size = n_pubkeys -| max_pubkey_capacity;
// TODO: drop based on stake weight
const drop_pubkeys = self.pubkey_to_values.keys()[0..drop_size];

const now = crds.get_wallclock();
const store_values = self.store.iterator().values;
const store_keys = self.store.iterator().keys;

for (drop_pubkeys) |pubkey| {
// remove all entries associated with the pubkey
if (self.shred_versions.contains(pubkey)) {
const did_remove = self.shred_versions.remove(pubkey);
std.debug.assert(did_remove);
}

var entry_indexs = self.pubkey_to_values.get(pubkey).?;
defer {
var did_remove = self.pubkey_to_values.swapRemove(pubkey);
std.debug.assert(did_remove);
entry_indexs.deinit();
}

const count = entry_indexs.count();
for (entry_indexs.iterator().keys[0..count]) |entry_index| {
// add to purged
const entry_value = store_values[entry_index];
const entry_key = store_keys[entry_index];

const hash = entry_value.value_hash;
self.purged.insert(hash, now);
self.shards.remove(entry_index, &hash);
{
var did_remove = self.entries.swapRemove(entry_value.cursor_on_insertion);
std.debug.assert(did_remove);
}

switch (entry_value.value.data) {
.LegacyContactInfo => {
var did_remove = self.contact_infos.swapRemove(entry_value.cursor_on_insertion);
std.debug.assert(did_remove);
},
.Vote => {
var did_remove = self.votes.swapRemove(entry_value.cursor_on_insertion);
std.debug.assert(did_remove);
},
.EpochSlots => {
var did_remove = self.epoch_slots.swapRemove(entry_value.cursor_on_insertion);
std.debug.assert(did_remove);
},
.DuplicateShred => {
var did_remove = self.duplicate_shreds.swapRemove(entry_value.cursor_on_insertion);
std.debug.assert(did_remove);
},
else => {},
}

// remove from store
{
const did_remove = self.store.swapRemove(entry_key);
std.debug.assert(did_remove);
}
}
}
}
};

pub const HashTimeQueue = struct {
Expand Down Expand Up @@ -431,6 +535,30 @@ pub fn crds_overwrites(new_value: *const CrdsVersionedValue, old_value: *const C
}
}

test "gossip.crds_table: trim pruned values" {
const keypair = try KeyPair.create([_]u8{1} ** 32);

var seed: u64 = @intCast(std.time.milliTimestamp());
var rng = std.rand.DefaultPrng.init(seed);

var crds_table = try CrdsTable.init(std.testing.allocator);
defer crds_table.deinit();

for (0..10) |_| {
const value = try CrdsValue.initSigned(CrdsData.random(rng.random()), keypair);
try crds_table.insert(value, 100);
}
try std.testing.expectEqual(crds_table.len(), 10);
try std.testing.expectEqual(crds_table.purged.len(), 0);
try std.testing.expectEqual(crds_table.pubkey_to_values.count(), 10);

crds_table.attempt_trim(5);

try std.testing.expectEqual(crds_table.len(), 5);
try std.testing.expectEqual(crds_table.pubkey_to_values.count(), 5);
try std.testing.expectEqual(crds_table.purged.len(), 5);
}

test "gossip.HashTimeQueue: trim pruned values" {
const keypair = try KeyPair.create([_]u8{1} ** 32);

Expand Down
Loading
Loading