diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5e24282d3..d1e0593fd 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,7 +17,7 @@ jobs: - name: setup-zig uses: goto-bus-stop/setup-zig@v1 with: - version: 0.11.0-dev.3997+546212ff7 + version: 0.11.0 - name: lint run: | @@ -40,7 +40,7 @@ jobs: - name: setup-zig uses: goto-bus-stop/setup-zig@v1 with: - version: 0.11.0-dev.3997+546212ff7 + version: 0.11.0 - name: build run: zig build diff --git a/build.zig.zon b/build.zig.zon index 2afdd4ff0..6faba9e9f 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -7,16 +7,16 @@ .hash = "1220227d746ee3f156059a22b0044c6d8ad062a7cfb15b060cc2e7cbae4d0631f34e", }, .@"zig-network" = .{ - .url = "https://github.com/MasterQ32/zig-network/archive/7b5f76ea09626b96755c027bf4bd5b7e45297027.tar.gz", - .hash = "122090e7cb4459c2224399a45c56f47462733b919aa547c96b8c14ee705bfa22976e", + .url = "https://github.com/MasterQ32/zig-network/archive/02ffa0fc310ff0746f06c9bfb73b008ce4fca200.tar.gz", + .hash = "1220fe0d4ca564aa15f3fd7f9f84ba04e031bb70c3a1432257e0d35c74b183550c24", }, .@"zig-cli" = .{ .url = "https://github.com/sam701/zig-cli/archive/ea92821a797bdefb5df41ce602f8c7f387bcfb93.tar.gz", .hash = "122044525df0d1896556da08c5ed50b456a672a52ab86a65834bb1e27d3ae7a847f2", }, .getty = .{ - .url = "https://github.com/0xNineteen/getty/archive/04e1d75c3b4f122232f0b8efeaf296742cb5c262.tar.gz", - .hash = "122034c0905da94ef750e5c225b1b58810ee2d6370c776f81fdea383ec5217c275a2", + .url = "https://github.com/getty-zig/getty/archive/5b0e750d92ee4ef8e46ad743bb8ced63723acd00.tar.gz", + .hash = "12209398657d260abcd6dae946d8da4cd3057b8c7990608476a9f8011aae570d2ebb", }, }, } diff --git a/src/bincode/bincode.zig b/src/bincode/bincode.zig index 70d1f4d9e..d0b76188a 100644 --- a/src/bincode/bincode.zig +++ b/src/bincode/bincode.zig @@ -21,9 +21,7 @@ pub const Params = struct { include_fixed_array_length: bool = false, }; -pub fn deserializer(r: anytype, params: Params) blk: { - break :blk Deserializer(@TypeOf(r)); -} { +pub fn deserializer(r: anytype, params: Params) Deserializer(@TypeOf(r)) { return Deserializer(@TypeOf(r)).init(r, params); } @@ -211,20 +209,21 @@ pub fn Deserializer(comptime Reader: type) type { const params = dd.context.params; var data: T = undefined; - if (getStructSerializer(T)) |deser_fcn| { - data = try deser_fcn(alloc, T, reader, params); - return data; - } - inline for (info.fields) |field| { if (!field.is_comptime) { - if (shouldUseDefaultValue(T, field)) |val| { - @field(data, field.name) = @as(*const field.type, @ptrCast(@alignCast(val))).*; - } else if (getFieldDeserializer(T, field)) |deser_fcn| { - @field(data, field.name) = try (deser_fcn(alloc, field.type, reader, params) catch getty.de.Error.InvalidValue); - } else { - @field(data, field.name) = try getty.deserialize(alloc, field.type, dd); + if (get_field_config(T, field)) |config| { + if (shouldUseDefaultValue(field, config)) |default_val| { + @field(data, field.name) = @as(*const field.type, @ptrCast(@alignCast(default_val))).*; + continue; + } + + if (config.deserializer) |deser_fcn| { + @field(data, field.name) = deser_fcn(alloc, reader, params) catch return getty.de.Error.InvalidValue; + continue; + } } + + @field(data, field.name) = try getty.deserialize(alloc, field.type, dd); } } return data; @@ -288,9 +287,7 @@ pub fn free( } // for ref: https://github.com/getty-zig/json/blob/a5c4d9f996dc3f472267f6210c30f96c39da576b/src/ser/serializer.zig -pub fn serializer(w: anytype, params: Params) blk: { - break :blk Serializer(@TypeOf(w)); -} { +pub fn serializer(w: anytype, params: Params) Serializer(@TypeOf(w)) { return Serializer(@TypeOf(w)).init(w, params); } @@ -429,19 +426,19 @@ pub fn Serializer( var params = ss.context.params; var writer = ss.context.writer; - if (getStructSerializer(T)) |ser_fcn| { - return ser_fcn(writer, value, params); - } - inline for (info.fields) |field| { if (!field.is_comptime) { - if (!shouldSkipSerializingField(T, field)) { - if (getFieldSerializer(T, field)) |ser_fcn| { - try (ser_fcn(writer, @field(value, field.name), params) catch Error.IO); - } else { - try getty.serialize(alloc, @field(value, field.name), ss); + if (get_field_config(T, field)) |config| { + if (config.skip) { + continue; + } + if (config.serializer) |ser_fcn| { + ser_fcn(writer, @field(value, field.name), params) catch return Error.IO; + continue; } } + + try getty.serialize(alloc, @field(value, field.name), ss); } } }, @@ -485,72 +482,43 @@ pub fn Serializer( }; } -pub inline fn shouldUseDefaultValue(comptime parent_type: type, comptime struct_field: std.builtin.Type.StructField) ?*const anyopaque { - const parent_type_name = @typeName(parent_type); - - if (@hasDecl(parent_type, "!bincode-config:" ++ struct_field.name)) { - const config = @field(parent_type, "!bincode-config:" ++ struct_field.name); - if (config.skip and struct_field.default_value == null) { - @compileError("┓\n|\n|--> Invalid config: cannot skip field '" ++ parent_type_name ++ "." ++ struct_field.name ++ "' deserialization if no default value set\n\n"); - } - return struct_field.default_value; - } - - return null; -} - -pub fn getFieldDeserializer(comptime parent_type: type, comptime struct_field: std.builtin.Type.StructField) ?DeserializeFunction { - if (@hasDecl(parent_type, "!bincode-config:" ++ struct_field.name)) { - const config = @field(parent_type, "!bincode-config:" ++ struct_field.name); - return config.deserializer; - } - return null; +// need this fn to define the return type T +pub fn DeserializeFunction(comptime T: type) type { + return fn (alloc: ?std.mem.Allocator, reader: anytype, params: Params) anyerror!T; } - pub const SerializeFunction = fn (writer: anytype, data: anytype, params: Params) anyerror!void; -pub const DeserializeFunction = fn (alloc: ?std.mem.Allocator, comptime T: type, reader: anytype, params: Params) anyerror!void; - -pub const FieldConfig = struct { - serializer: ?SerializeFunction = null, - deserializer: ?DeserializeFunction = null, - skip: bool = false, -}; -pub const StructConfig = struct { - serializer: ?SerializeFunction = null, - deserializer: ?DeserializeFunction = null, -}; - -pub fn getStructSerializer(comptime parent_type: type) ?SerializeFunction { - if (@hasDecl(parent_type, "!bincode-config")) { - const config = @field(parent_type, "!bincode-config"); - return config.serializer; - } - return null; +// ** Field Functions ** // +pub fn FieldConfig(comptime T: type) type { + return struct { + deserializer: ?DeserializeFunction(T) = null, + serializer: ?SerializeFunction = null, + skip: bool = false, + }; } -pub fn getFieldSerializer(comptime parent_type: type, comptime struct_field: std.builtin.Type.StructField) ?SerializeFunction { - if (@hasDecl(parent_type, "!bincode-config:" ++ struct_field.name)) { - const config = @field(parent_type, "!bincode-config:" ++ struct_field.name); - return config.serializer; +pub fn get_field_config(comptime struct_type: type, comptime field: std.builtin.Type.StructField) ?FieldConfig(field.type) { + const bincode_field = "!bincode-config:" ++ field.name; + if (@hasDecl(struct_type, bincode_field)) { + const config = @field(struct_type, bincode_field); + return config; } return null; } -pub inline fn shouldSkipSerializingField(comptime parent_type: type, comptime struct_field: std.builtin.Type.StructField) bool { - const parent_type_name = @typeName(parent_type); - - if (@hasDecl(parent_type, "!bincode-config:" ++ struct_field.name)) { - const config = @field(parent_type, "!bincode-config:" ++ struct_field.name); - if (config.skip and struct_field.default_value == null) { - @compileError("┓\n|\n|--> Invalid config: cannot skip field '" ++ parent_type_name ++ "." ++ struct_field.name ++ "' serialization if no default value set\n\n"); +pub inline fn shouldUseDefaultValue(comptime field: std.builtin.Type.StructField, comptime config: FieldConfig(field.type)) ?*const anyopaque { + if (config.skip) { + if (field.default_value == null) { + const field_type_name = @typeName(field.type); + @compileError("┓\n|\n|--> Invalid config: cannot skip field '" ++ field_type_name ++ "." ++ field.name ++ "' deserialization if no default value set\n\n"); } - return config.skip; + return field.default_value; + } else { + return null; } - - return false; } +// ** Writer/Reader functions ** // pub fn writeToSlice(slice: []u8, data: anytype, params: Params) ![]u8 { var stream = std.io.fixedBufferStream(slice); var writer = stream.writer(); @@ -587,6 +555,66 @@ pub fn read(alloc: ?std.mem.Allocator, comptime T: type, reader: anytype, params return v; } +// ** Tests **// +fn TestSliceConfig(comptime Child: type) FieldConfig([]Child) { + const S = struct { + fn deserialize_test_slice(allocator: ?std.mem.Allocator, reader: anytype, params: Params) ![]Child { + var ally = allocator.?; + var len = try bincode.read(ally, u16, reader, params); + var elems = try ally.alloc(Child, len); + for (0..len) |i| { + elems[i] = try bincode.read(ally, Child, reader, params); + } + return elems; + } + + pub fn serilaize_test_slice(writer: anytype, data: anytype, params: bincode.Params) !void { + var len = std.math.cast(u16, data.len) orelse return error.DataTooLarge; + try bincode.write(null, writer, len, params); + for (data) |item| { + try bincode.write(null, writer, item, params); + } + return; + } + }; + + return FieldConfig([]Child){ + .serializer = S.serilaize_test_slice, + .deserializer = S.deserialize_test_slice, + }; +} + +test "bincode: custom field serialization" { + const Foo = struct { + accounts: []u8, + txs: []u32, + skip_me: u8 = 20, + + pub const @"!bincode-config:accounts" = TestSliceConfig(u8); + pub const @"!bincode-config:txs" = TestSliceConfig(u32); + pub const @"!bincode-config:skip_me" = FieldConfig(u8){ + .skip = true, + }; + }; + + var accounts = [_]u8{ 1, 2, 3 }; + var txs = [_]u32{ 1, 2, 3 }; + var foo = Foo{ .accounts = &accounts, .txs = &txs }; + + var buf: [1000]u8 = undefined; + var out = try writeToSlice(&buf, foo, Params{}); + std.debug.print("{any}", .{out}); + try std.testing.expect(out[out.len - 1] != 20); // skip worked + + var r = try readFromSlice(std.testing.allocator, Foo, out, Params{}); + defer free(std.testing.allocator, r); + std.debug.print("{any}", .{r}); + + try std.testing.expect(r.accounts.len == foo.accounts.len); + try std.testing.expect(r.txs.len == foo.txs.len); + try std.testing.expect(r.skip_me == 20); +} + test "bincode: test serialization" { var buf: [1]u8 = undefined; diff --git a/src/bloom/bloom.zig b/src/bloom/bloom.zig index 62563dda7..fc71a1ba7 100644 --- a/src/bloom/bloom.zig +++ b/src/bloom/bloom.zig @@ -9,6 +9,8 @@ const bincode = @import("../bincode/bincode.zig"); const FnvHasher = @import("../crypto/fnv.zig").FnvHasher; const testing = std.testing; +const RndGen = std.rand.DefaultPrng; + /// A bloom filter whose bitset is made up of u64 blocks pub const Bloom = struct { keys: ArrayList(u64), @@ -16,21 +18,25 @@ pub const Bloom = struct { num_bits_set: u64, pub const @"!bincode-config:keys" = ArrayListConfig(u64); - pub const @"!bincode-config:bits" = bincode.FieldConfig{ - .serializer = bincode_serialize_bit_vec, - .deserializer = bincode_deserialize_bit_vec, - }; + pub const @"!bincode-config:bits" = BitVecConfig(); const Self = @This(); - pub fn init(allocator: std.mem.Allocator, num_bits: u64) Self { + pub fn init(alloc: std.mem.Allocator, n_bits: u64, keys: ?ArrayList(u64)) Self { // need to be power of 2 for serialization to match rust - if (num_bits != 0) { - std.debug.assert((num_bits & (num_bits - 1) == 0)); + var bits = n_bits; + if (n_bits != 0) { + if (n_bits & (n_bits - 1) != 0) { + // nearest power of two + const _n_bits = std.math.pow(u64, 2, std.math.log2(n_bits) + 1); + std.debug.print("rounding n_bits of bloom from {any} to {any}\n", .{ n_bits, _n_bits }); + bits = _n_bits; + } } + return Self{ - .keys = ArrayList(u64).init(allocator), - .bits = DynamicBitSet.initEmpty(allocator, num_bits) catch unreachable, + .keys = keys orelse ArrayList(u64).init(alloc), + .bits = DynamicBitSet.initEmpty(alloc, n_bits) catch unreachable, .num_bits_set = 0, }; } @@ -40,6 +46,7 @@ pub const Bloom = struct { self.keys.deinit(); } + // used in tests pub fn add_key(self: *Self, key: u64) !void { try self.keys.append(key); } @@ -54,7 +61,18 @@ pub const Bloom = struct { } } - pub fn pos(self: *Self, bytes: []const u8, hash_index: u64) u64 { + pub fn contains(self: *const Self, key: []const u8) bool { + for (self.keys.items) |hash_index| { + var i = self.pos(key, hash_index); + if (self.bits.isSet(i)) { + continue; + } + return false; + } + return true; + } + + pub fn pos(self: *const Self, bytes: []const u8, hash_index: u64) u64 { return hash_at_index(bytes, hash_index) % @as(u64, self.bits.capacity()); } @@ -63,26 +81,84 @@ pub const Bloom = struct { hasher.update(bytes); return hasher.final(); } + + pub fn random(alloc: std.mem.Allocator, num_items: usize, false_rate: f64, max_bits: usize) !Self { + const n_items_f: f64 = @floatFromInt(num_items); + const m = Bloom.num_bits(n_items_f, false_rate); + const n_bits = @max(1, @min(@as(usize, @intFromFloat(m)), max_bits)); + const n_keys = Bloom.num_keys(@floatFromInt(n_bits), n_items_f); + + var seed = @as(u64, @intCast(std.time.milliTimestamp())); + var rnd = RndGen.init(seed); + + var keys = try ArrayList(u64).initCapacity(alloc, n_keys); + for (0..n_keys) |_| { + const v = rnd.random().int(u64); + keys.appendAssumeCapacity(v); + } + + return Bloom.init(alloc, n_bits, keys); + } + + fn num_bits(num_items: f64, false_rate: f64) f64 { + const n = num_items; + const p = false_rate; + const two: f64 = 2; + + // const d: f64 = -4.804530139182015e-01 + const d: f64 = @log(@as(f64, 1) / (std.math.pow(f64, two, @log(two)))); + return std.math.ceil((n * @log(p)) / d); + } + + fn num_keys(n_bits: f64, num_items: f64) usize { + const n = num_items; + const m = n_bits; + + if (n == 0) { + return 0; + } else { + return @intFromFloat(@max(@as(f64, 1), std.math.round((m / n) * @log(@as(f64, 2))))); + } + } }; -fn bincode_serialize_bit_vec(writer: anytype, data: anytype, params: bincode.Params) !void { - var bitset: DynamicBitSet = data; - var bitvec = BitVec.initFromBitSet(bitset); - try bincode.write(null, writer, bitvec, params); - return; +pub fn BitVecConfig() bincode.FieldConfig(DynamicBitSet) { + const S = struct { + pub fn serialize(writer: anytype, data: anytype, params: bincode.Params) !void { + var bitset: DynamicBitSet = data; + var bitvec = BitVec.initFromBitSet(bitset); + try bincode.write(null, writer, bitvec, params); + } + + pub fn deserialize(allocator: ?std.mem.Allocator, reader: anytype, params: bincode.Params) !DynamicBitSet { + var ally = allocator.?; + var bitvec = try bincode.read(ally, BitVec, reader, params); + defer bincode.free(ally, bitvec); + + var dynamic_bitset = try bitvec.toBitSet(ally); + return dynamic_bitset; + } + }; + + return bincode.FieldConfig(DynamicBitSet){ + .serializer = S.serialize, + .deserializer = S.deserialize, + }; } -fn bincode_deserialize_bit_vec(allocator: ?std.mem.Allocator, comptime T: type, reader: anytype, params: bincode.Params) !T { - var ally = allocator.?; - var bitvec = try bincode.read(ally, BitVec, reader, params); - defer bincode.free(ally, bitvec); +test "bloom: helper fcns match rust" { + const n_bits = Bloom.num_bits(100.2, 1e-5); + try testing.expectEqual(@as(f64, 2402), n_bits); - var dynamic_bitset = try bitvec.toBitSet(ally); - return dynamic_bitset; + const n_keys = Bloom.num_keys(100.2, 10); + try testing.expectEqual(@as(usize, 7), n_keys); + + var bloom = try Bloom.random(std.testing.allocator, 100, 0.1, 10000); + defer bloom.deinit(); } test "bloom: serializes/deserializes correctly" { - var bloom = Bloom.init(testing.allocator, 0); + var bloom = Bloom.init(testing.allocator, 0, null); var buf: [10000]u8 = undefined; var out = try bincode.writeToSlice(buf[0..], bloom, bincode.Params.standard); @@ -92,7 +168,7 @@ test "bloom: serializes/deserializes correctly" { } test "bloom: serializes/deserializes correctly with set bits" { - var bloom = Bloom.init(testing.allocator, 128); + var bloom = Bloom.init(testing.allocator, 128, null); try bloom.add_key(10); // required for memory leaks defer bloom.deinit(); @@ -108,7 +184,7 @@ test "bloom: serializes/deserializes correctly with set bits" { test "bloom: rust: serialized bytes equal rust (no keys)" { // note: need to init with len 2^i - var bloom = Bloom.init(testing.allocator, 128); + var bloom = Bloom.init(testing.allocator, 128, null); defer bloom.deinit(); try bloom.add_key(1); @@ -124,7 +200,7 @@ test "bloom: rust: serialized bytes equal rust (no keys)" { } test "bloom: rust: serialized bytes equal rust (multiple keys)" { - var bloom = Bloom.init(testing.allocator, 128); + var bloom = Bloom.init(testing.allocator, 128, null); defer bloom.deinit(); try bloom.add_key(1); diff --git a/src/core/hash.zig b/src/core/hash.zig index c33b65c97..744e94cf2 100644 --- a/src/core/hash.zig +++ b/src/core/hash.zig @@ -14,6 +14,20 @@ pub const Hash = struct { const Self = @This(); + // used in tests + pub fn random() Self { + var seed = @as(u64, @intCast(std.time.milliTimestamp())); + var rand = std.rand.DefaultPrng.init(seed); + var data: [HASH_SIZE]u8 = undefined; + + for (0..HASH_SIZE) |i| { + data[i] = rand.random().int(u8); + } + return Self{ + .data = data, + }; + } + pub fn generateSha256Hash(bytes: []const u8) Self { var hash = Hash{ .data = undefined, diff --git a/src/core/pubkey.zig b/src/core/pubkey.zig index fcb968f5c..dbe0cec66 100644 --- a/src/core/pubkey.zig +++ b/src/core/pubkey.zig @@ -66,11 +66,10 @@ pub const Pubkey = struct { } /// ***random*** generates a random pubkey. Optionally set `skip_encoding` to skip expensive base58 encoding. - pub fn random(options: struct { skip_encoding: bool = false, seed: ?u64 = null }) Self { + pub fn random(rng: std.rand.Random, options: struct { skip_encoding: bool = false }) Self { var bytes: [32]u8 = undefined; - var seed = options.seed orelse @as(u64, @intCast(std.time.milliTimestamp())); - var rand = std.rand.DefaultPrng.init(seed); - rand.fill(&bytes); + rng.bytes(&bytes); + var dest: [44]u8 = .{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, @@ -94,7 +93,7 @@ pub const Pubkey = struct { return Self.fromBytes(public_key.bytes[0..], .{ .skip_encoding = skip_bs58_encoding }) catch unreachable; } - pub const @"!bincode-config:cached_str" = bincode.FieldConfig{ .skip = true }; + pub const @"!bincode-config:cached_str" = bincode.FieldConfig(?[44]u8){ .skip = true }; pub const @"getty.sb" = struct { pub const attributes = .{ diff --git a/src/core/transaction.zig b/src/core/transaction.zig index 912d0fe55..d5f5be3e9 100644 --- a/src/core/transaction.zig +++ b/src/core/transaction.zig @@ -1,5 +1,5 @@ const std = @import("std"); -const shortvec_config = @import("../utils/shortvec.zig").shortvec_config; +const ShortVecConfig = @import("../utils/shortvec.zig").ShortVecConfig; const Signature = @import("signature.zig").Signature; const Pubkey = @import("pubkey.zig").Pubkey; const Hash = @import("hash.zig").Hash; @@ -8,7 +8,7 @@ pub const Transaction = struct { signatures: []Signature, message: Message, - pub const @"!bincode-config:signatures" = shortvec_config; + pub const @"!bincode-config:signatures" = ShortVecConfig(Signature); // used in tests pub fn default() Transaction { @@ -25,8 +25,8 @@ pub const Message = struct { recent_blockhash: Hash, instructions: []CompiledInstruction, - pub const @"!bincode-config:account_keys" = shortvec_config; - pub const @"!bincode-config:instructions" = shortvec_config; + pub const @"!bincode-config:account_keys" = ShortVecConfig(Pubkey); + pub const @"!bincode-config:instructions" = ShortVecConfig(CompiledInstruction); pub fn default() Message { return Message{ @@ -66,8 +66,8 @@ pub const CompiledInstruction = struct { /// The program input data. data: []u8, - pub const @"!bincode-config:accounts" = shortvec_config; - pub const @"!bincode-config:data" = shortvec_config; + pub const @"!bincode-config:accounts" = ShortVecConfig(u8); + pub const @"!bincode-config:data" = ShortVecConfig(u8); }; test "core.transaction: tmp" { diff --git a/src/gossip/cmd.zig b/src/gossip/cmd.zig index d46115f6f..e116ea756 100644 --- a/src/gossip/cmd.zig +++ b/src/gossip/cmd.zig @@ -60,7 +60,7 @@ pub fn runGossipService(gossip_port: u16, entrypoints: ArrayList(LegacyContactIn var spy = try ClusterInfo.initSpy(gpa_allocator, gossip_socket_addr, entrypoints, logger); - var gossip_service = GossipService.init(gpa_allocator, &spy.cluster_info, spy.gossip_socket, exit); + var gossip_service = try GossipService.init(gpa_allocator, &spy.cluster_info, spy.gossip_socket, exit); var handle = try std.Thread.spawn(.{}, GossipService.run, .{ &gossip_service, logger }); diff --git a/src/gossip/crds.zig b/src/gossip/crds.zig index f3dd081ac..e150fdc24 100644 --- a/src/gossip/crds.zig +++ b/src/gossip/crds.zig @@ -18,26 +18,6 @@ pub fn get_wallclock() u64 { return @intCast(std.time.milliTimestamp()); } -pub const CrdsFilter = struct { - filter: Bloom, - mask: u64, - mask_bits: u32, - - const Self = @This(); - - pub fn init(allocator: std.mem.Allocator) Self { - return Self{ - .filter = Bloom.init(allocator, 0), - .mask = 18_446_744_073_709_551_615, - .mask_bits = 0, - }; - } - - pub fn deinit(self: *Self) void { - self.filter.deinit(); - } -}; - pub const CrdsVersionedValue = struct { ordinal: u64, value: CrdsValue, @@ -69,6 +49,10 @@ pub const CrdsValue = struct { return self; } + pub fn random(rng: std.rand.Random, keypair: KeyPair) !Self { + return try Self.initSigned(CrdsData.random(rng), keypair); + } + pub fn sign(self: *Self, keypair: KeyPair) !void { var buf = [_]u8{0} ** 1500; var bytes = try bincode.writeToSlice(&buf, self.data, bincode.Params.standard); @@ -237,8 +221,12 @@ pub const LegacyContactInfo = struct { const unspecified_addr = SocketAddr.init_ipv4(.{ 0, 0, 0, 0 }, 0); const wallclock = get_wallclock(); + var seed: u64 = @intCast(std.time.milliTimestamp()); + var rand = std.rand.DefaultPrng.init(seed); + const rng = rand.random(); + return LegacyContactInfo{ - .id = Pubkey.random(.{ .skip_encoding = true }), + .id = Pubkey.random(rng, .{ .skip_encoding = true }), .gossip = unspecified_addr, .tvu = unspecified_addr, .tvu_forwards = unspecified_addr, @@ -253,6 +241,24 @@ pub const LegacyContactInfo = struct { .shred_version = 0, }; } + + pub fn random(rng: std.rand.Random) LegacyContactInfo { + return LegacyContactInfo{ + .id = Pubkey.random(rng, .{ .skip_encoding = true }), + .gossip = SocketAddr.random(rng), + .tvu = SocketAddr.random(rng), + .tvu_forwards = SocketAddr.random(rng), + .repair = SocketAddr.random(rng), + .tpu = SocketAddr.random(rng), + .tpu_forwards = SocketAddr.random(rng), + .tpu_vote = SocketAddr.random(rng), + .rpc = SocketAddr.random(rng), + .rpc_pubsub = SocketAddr.random(rng), + .serve_repair = SocketAddr.random(rng), + .wallclock = get_wallclock(), + .shred_version = rng.int(u16), + }; + } }; pub const CrdsValueLabel = union(enum) { @@ -283,6 +289,24 @@ pub const CrdsData = union(enum(u32)) { DuplicateShred: struct { u16, DuplicateShred }, SnapshotHashes: SnapshotHashes, ContactInfo: ContactInfo, + + pub fn random(rng: std.rand.Random) CrdsData { + const v = rng.intRangeAtMost(u16, 0, 3); + switch (v) { + 0 => { + return CrdsData{ .LegacyContactInfo = LegacyContactInfo.random(rng) }; + }, + 1 => { + return CrdsData{ .EpochSlots = .{ rng.int(u8), EpochSlots.random(rng) } }; + }, + 2 => { + return CrdsData{ .Vote = .{ rng.int(u8), Vote.random(rng) } }; + }, + else => { + return CrdsData{ .DuplicateShred = .{ rng.int(u16), DuplicateShred.random(rng) } }; + }, + } + } }; pub const Vote = struct { @@ -291,7 +315,16 @@ pub const Vote = struct { wallclock: u64, slot: Slot = Slot.default(), - pub const @"!bincode-config:slot" = bincode.FieldConfig{ .skip = true }; + pub const @"!bincode-config:slot" = bincode.FieldConfig(Slot){ .skip = true }; + + pub fn random(rng: std.rand.Random) Vote { + return Vote{ + .from = Pubkey.random(rng, .{ .skip_encoding = true }), + .transaction = Transaction.default(), + .wallclock = get_wallclock(), + .slot = Slot.init(rng.int(u64)), + }; + } }; pub const LowestSlot = struct { @@ -327,6 +360,15 @@ pub const EpochSlots = struct { from: Pubkey, slots: []CompressedSlots, wallclock: u64, + + pub fn random(rng: std.rand.Random) EpochSlots { + var slice: [0]CompressedSlots = .{}; + return EpochSlots{ + .from = Pubkey.random(rng, .{ .skip_encoding = true }), + .slots = &slice, + .wallclock = get_wallclock(), + }; + } }; pub const CompressedSlots = union(enum(u32)) { @@ -384,7 +426,7 @@ pub const Version = struct { pub fn default(from: Pubkey) Self { return Self{ .from = from, - .wallclock = @intCast(std.time.milliTimestamp()), + .wallclock = get_wallclock(), .version = LegacyVersion2.CURRENT, }; } @@ -455,6 +497,20 @@ pub const DuplicateShred = struct { num_chunks: u8, chunk_index: u8, chunk: []u8, + + pub fn random(rng: std.rand.Random) DuplicateShred { + var slice = [_]u8{0} ** 32; + return DuplicateShred{ + .from = Pubkey.random(rng, .{ .skip_encoding = true }), + .wallclock = get_wallclock(), + .slot = Slot.init(rng.int(u64)), + .shred_index = rng.int(u32), + .shred_type = ShredType.Data, + .num_chunks = rng.int(u8), + .chunk_index = rng.int(u8), + .chunk = &slice, + }; + } }; pub const SnapshotHashes = struct { @@ -469,22 +525,10 @@ test "gossip.crds: test CrdsValue label() and id() methods" { var kp = try KeyPair.create(kp_bytes); const pk = kp.public_key; var id = Pubkey.fromPublicKey(&pk, true); - const unspecified_addr = SocketAddr.unspecified(); - var legacy_contact_info = LegacyContactInfo{ - .id = id, - .gossip = unspecified_addr, - .tvu = unspecified_addr, - .tvu_forwards = unspecified_addr, - .repair = unspecified_addr, - .tpu = unspecified_addr, - .tpu_forwards = unspecified_addr, - .tpu_vote = unspecified_addr, - .rpc = unspecified_addr, - .rpc_pubsub = unspecified_addr, - .serve_repair = unspecified_addr, - .wallclock = 0, - .shred_version = 0, - }; + + var legacy_contact_info = LegacyContactInfo.default(); + legacy_contact_info.id = id; + legacy_contact_info.wallclock = 0; var crds_value = try CrdsValue.initSigned(CrdsData{ .LegacyContactInfo = legacy_contact_info, @@ -494,16 +538,6 @@ test "gossip.crds: test CrdsValue label() and id() methods" { try std.testing.expect(crds_value.label().LegacyContactInfo.equals(&id)); } -test "gossip.crds: default crds filter matches rust bytes" { - const rust_bytes = [_]u8{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 0 }; - var filter = CrdsFilter.init(std.testing.allocator); - defer filter.deinit(); - - var buf = [_]u8{0} ** 1024; - var bytes = try bincode.writeToSlice(buf[0..], filter, bincode.Params.standard); - try std.testing.expectEqualSlices(u8, rust_bytes[0..], bytes); -} - test "gossip.crds: pubkey matches rust" { var kp_bytes = [_]u8{1} ** 32; const kp = try KeyPair.create(kp_bytes); @@ -526,29 +560,15 @@ test "gossip.crds: contact info serialization matches rust" { const id = Pubkey.fromPublicKey(&pk, true); const gossip_addr = SocketAddr.init_ipv4(.{ 127, 0, 0, 1 }, 1234); - const unspecified_addr = SocketAddr.unspecified(); var buf = [_]u8{0} ** 1024; - var legacy_contact_info = LegacyContactInfo{ - .id = id, - .gossip = gossip_addr, - .tvu = unspecified_addr, - .tvu_forwards = unspecified_addr, - .repair = unspecified_addr, - .tpu = unspecified_addr, - .tpu_forwards = unspecified_addr, - .tpu_vote = unspecified_addr, - .rpc = unspecified_addr, - .rpc_pubsub = unspecified_addr, - .serve_repair = unspecified_addr, - .wallclock = 0, - .shred_version = 0, - }; + var legacy_contact_info = LegacyContactInfo.default(); + legacy_contact_info.gossip = gossip_addr; + legacy_contact_info.id = id; + legacy_contact_info.wallclock = 0; var contact_info_rust = [_]u8{ 138, 136, 227, 221, 116, 9, 241, 149, 253, 82, 219, 45, 60, 186, 93, 114, 202, 103, 9, 191, 29, 148, 18, 27, 243, 116, 136, 1, 180, 15, 111, 92, 0, 0, 0, 0, 127, 0, 0, 1, 210, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - // var bytes = try bincode.writeToSlice(buf[0..], legacy_contact_info, bincode.Params.standard); - var bytes = try bincode.writeToSlice(buf[0..], legacy_contact_info, bincode.Params.standard); try std.testing.expectEqualSlices(u8, bytes[0..bytes.len], &contact_info_rust); } @@ -560,31 +580,52 @@ test "gossip.crds: crds data serialization matches rust" { const id = Pubkey.fromPublicKey(&pk, true); const gossip_addr = SocketAddr.init_ipv4(.{ 127, 0, 0, 1 }, 1234); - const unspecified_addr = SocketAddr.unspecified(); - var buf = [_]u8{0} ** 1024; - - var legacy_contact_info = LegacyContactInfo{ - .id = id, - .gossip = gossip_addr, - .tvu = unspecified_addr, - .tvu_forwards = unspecified_addr, - .repair = unspecified_addr, - .tpu = unspecified_addr, - .tpu_forwards = unspecified_addr, - .tpu_vote = unspecified_addr, - .rpc = unspecified_addr, - .rpc_pubsub = unspecified_addr, - .serve_repair = unspecified_addr, - .wallclock = 0, - .shred_version = 0, - }; + var legacy_contact_info = LegacyContactInfo.default(); + legacy_contact_info.gossip = gossip_addr; + legacy_contact_info.id = id; + legacy_contact_info.wallclock = 0; var crds_data = CrdsData{ .LegacyContactInfo = legacy_contact_info, }; + var buf = [_]u8{0} ** 1024; var rust_crds_data = [_]u8{ 0, 0, 0, 0, 138, 136, 227, 221, 116, 9, 241, 149, 253, 82, 219, 45, 60, 186, 93, 114, 202, 103, 9, 191, 29, 148, 18, 27, 243, 116, 136, 1, 180, 15, 111, 92, 0, 0, 0, 0, 127, 0, 0, 1, 210, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; var bytes = try bincode.writeToSlice(buf[0..], crds_data, bincode.Params.standard); try std.testing.expectEqualSlices(u8, bytes[0..bytes.len], rust_crds_data[0..bytes.len]); } + +test "gossip.crds: random crds data" { + var seed: u64 = @intCast(std.time.milliTimestamp()); + var rand = std.rand.DefaultPrng.init(seed); + const rng = rand.random(); + + var buf: [1000]u8 = undefined; + + { + const data = LegacyContactInfo.random(rng); + const result = try bincode.writeToSlice(&buf, data, bincode.Params.standard); + _ = result; + } + { + const data = EpochSlots.random(rng); + const result = try bincode.writeToSlice(&buf, data, bincode.Params.standard); + _ = result; + } + { + const data = Vote.random(rng); + const result = try bincode.writeToSlice(&buf, data, bincode.Params.standard); + _ = result; + } + { + const data = DuplicateShred.random(rng); + const result = try bincode.writeToSlice(&buf, data, bincode.Params.standard); + _ = result; + } + { + const data = CrdsData.random(rng); + const result = try bincode.writeToSlice(&buf, data, bincode.Params.standard); + _ = result; + } +} diff --git a/src/gossip/crds_shards.zig b/src/gossip/crds_shards.zig new file mode 100644 index 000000000..aaa3dba5b --- /dev/null +++ b/src/gossip/crds_shards.zig @@ -0,0 +1,201 @@ +const std = @import("std"); +const AutoArrayHashMap = std.AutoArrayHashMap; +const AutoHashMap = std.AutoHashMap; + +const bincode = @import("../bincode/bincode.zig"); + +const Hash = @import("../core/hash.zig").Hash; + +const SocketAddr = @import("net.zig").SocketAddr; + +const crds = @import("./crds.zig"); +const CrdsValue = crds.CrdsValue; +const CrdsData = crds.CrdsData; +const CrdsVersionedValue = crds.CrdsVersionedValue; +const CrdsValueLabel = crds.CrdsValueLabel; +const LegacyContactInfo = crds.LegacyContactInfo; + +const Transaction = @import("../core/transaction.zig").Transaction; +const Pubkey = @import("../core/pubkey.zig").Pubkey; +const KeyPair = std.crypto.sign.Ed25519.KeyPair; +const RwLock = std.Thread.RwLock; + +const CrdsPull = @import("./pull_request.zig"); + +pub const CRDS_SHARDS_BITS: u32 = 12; +pub const CRDS_SHARDS_LEN: u32 = 1 << CRDS_SHARDS_BITS; + +pub const CrdsShards = struct { + // shards[k] includes crds values which the first shard_bits of their hash + // value is equal to k. Each shard is a mapping from crds values indices to + // their hash value. + shard_bits: u32 = CRDS_SHARDS_BITS, + shards: [CRDS_SHARDS_LEN]AutoArrayHashMap(usize, u64), + + const Self = @This(); + + pub fn init(alloc: std.mem.Allocator) !Self { + var shards: [CRDS_SHARDS_LEN]AutoArrayHashMap(usize, u64) = undefined; + for (0..CRDS_SHARDS_LEN) |i| { + shards[i] = AutoArrayHashMap(usize, u64).init(alloc); + } + + return Self{ + .shards = shards, + }; + } + + pub fn deinit(self: *Self) void { + for (0..self.shards.len) |i| { + self.shards[i].deinit(); + } + } + + pub fn insert(self: *Self, crds_index: usize, hash: *const Hash) !void { + const uhash = CrdsPull.hash_to_u64(hash); + const shard_index = CrdsShards.compute_shard_index(self.shard_bits, uhash); + const shard = &self.shards[shard_index]; + try shard.put(crds_index, uhash); + } + + pub fn remove(self: *Self, crds_index: usize, hash: *const Hash) !void { + const uhash = CrdsPull.hash_to_u64(hash); + const shard_index = CrdsShards.compute_shard_index(self.shard_bits, uhash); + const shard = &self.shards[shard_index]; + _ = shard.swapRemove(crds_index); + } + + pub fn compute_shard_index(shard_bits: u32, hash: u64) usize { + const shift_bits: u6 = @intCast(64 - shard_bits); + return @intCast(hash >> shift_bits); + } + + /// see filter_crds_values for more readable (but inefficient) version of what this fcn is doing + pub fn find(self: *const Self, alloc: std.mem.Allocator, mask: u64, mask_bits: u32) !std.ArrayList(usize) { + const ones = (~@as(u64, 0) >> @as(u6, @intCast(mask_bits))); + const match_mask = mask | ones; + + if (self.shard_bits < mask_bits) { + // shard_bits is smaller, all matches with mask will be in the same shard index + var shard = self.shards[CrdsShards.compute_shard_index(self.shard_bits, mask)]; + + var shard_iter = shard.iterator(); + var result = std.ArrayList(usize).init(alloc); + while (shard_iter.next()) |entry| { + const hash = entry.value_ptr.*; + + // see check_mask + if (hash | ones == match_mask) { + const index = entry.key_ptr.*; + try result.append(index); + } + } + return result; + } else if (self.shard_bits == mask_bits) { + // when bits are equal we know the lookup will be exact + var shard = self.shards[CrdsShards.compute_shard_index(self.shard_bits, mask)]; + + var result = try std.ArrayList(usize).initCapacity(alloc, shard.count()); + try result.insertSlice(0, shard.keys()); + return result; + } else { + // shardbits > maskbits + const shift_bits: u6 = @intCast(self.shard_bits - mask_bits); + const count: usize = @intCast(@as(u64, 1) << shift_bits); + const end = CrdsShards.compute_shard_index(self.shard_bits, match_mask) + 1; + + var result = std.ArrayList(usize).init(alloc); + var insert_index: usize = 0; + for ((end - count)..end) |shard_index| { + const shard = self.shards[shard_index]; + try result.insertSlice(insert_index, shard.keys()); + insert_index += shard.count(); + } + return result; + } + } +}; + +const CrdsTable = @import("crds_table.zig").CrdsTable; + +test "gossip.crds_shards: tests CrdsShards" { + var shards = try CrdsShards.init(std.testing.allocator); + defer shards.deinit(); + + const v = Hash.random(); + try shards.insert(10, &v); + try shards.remove(10, &v); + + const result = try shards.find(std.testing.allocator, 20, 10); + defer result.deinit(); +} + +// test helper fcns +fn new_test_crds_value(rng: std.rand.Random, crds_table: *CrdsTable) !CrdsVersionedValue { + const keypair = try KeyPair.create(null); + var value = try CrdsValue.random(rng, keypair); + try crds_table.insert(value, 0, null); + const label = value.label(); + const x = crds_table.get(label).?; + return x; +} + +fn check_mask(value: *const CrdsVersionedValue, mask: u64, mask_bits: u32) bool { + const uhash = CrdsPull.hash_to_u64(&value.value_hash); + const ones = (~@as(u64, 0) >> @as(u6, @intCast(mask_bits))); + return (uhash | ones) == (mask | ones); +} + +// does the same thing as find() but a lot more inefficient +fn filter_crds_values( + alloc: std.mem.Allocator, + values: []CrdsVersionedValue, + mask: u64, + mask_bits: u32, +) !std.AutoHashMap(usize, void) { + var result = std.AutoHashMap(usize, void).init(alloc); + for (values, 0..) |value, i| { + if (check_mask(&value, mask, mask_bits)) { + try result.put(i, {}); + } + } + return result; +} + +test "gossip.crds_shards: test shard find" { + var crds_table = try CrdsTable.init(std.testing.allocator); + defer crds_table.deinit(); + + // gen ranndom values + var values = try std.ArrayList(CrdsVersionedValue).initCapacity(std.testing.allocator, 1000); + defer values.deinit(); + + var seed: u64 = @intCast(std.time.milliTimestamp()); + var rand = std.rand.DefaultPrng.init(seed); + const rng = rand.random(); + + while (values.items.len < 1000) { + const value = try new_test_crds_value(rng, &crds_table); + try values.append(value); + } + + var crds_shards = crds_table.shards; + // test find with different mask bit sizes (< > == shard bits) + for (0..10) |_| { + var mask = rng.int(u64); + for (0..12) |mask_bits| { + var set = try filter_crds_values(std.testing.allocator, values.items, mask, @intCast(mask_bits)); + defer set.deinit(); + + var indexs = try crds_shards.find(std.testing.allocator, mask, @intCast(mask_bits)); + defer indexs.deinit(); + + try std.testing.expectEqual(set.count(), @as(u32, @intCast(indexs.items.len))); + + for (indexs.items) |index| { + _ = set.remove(index); + } + try std.testing.expectEqual(set.count(), 0); + } + } +} diff --git a/src/gossip/crds_table.zig b/src/gossip/crds_table.zig index 2b37f7fcf..847ad092e 100644 --- a/src/gossip/crds_table.zig +++ b/src/gossip/crds_table.zig @@ -10,6 +10,8 @@ const CompareResult = hash.CompareResult; const SocketAddr = @import("net.zig").SocketAddr; +const CrdsShards = @import("./crds_shards.zig").CrdsShards; + const crds = @import("./crds.zig"); const CrdsValue = crds.CrdsValue; const CrdsData = crds.CrdsData; @@ -20,18 +22,25 @@ const LegacyContactInfo = crds.LegacyContactInfo; const Transaction = @import("../core/transaction.zig").Transaction; const Pubkey = @import("../core/pubkey.zig").Pubkey; const KeyPair = std.crypto.sign.Ed25519.KeyPair; - -// tmp upperbound on number for `get_nodes`/`get_votes`/... -// enables stack allocations for buffers in the getter functions -const MAX_N_CONTACT_INFOS = 100; -const MAX_N_VOTES = 20; -const MAX_N_EPOCH_SLOTS = 20; -const MAX_N_DUP_SHREDS = 20; +const RwLock = std.Thread.RwLock; pub const CrdsError = error{ OldValue, + DuplicateValue, +}; + +pub const GossipRoute = enum { + LocalMessage, + PullRequest, + PullResponse, + PushMessage, }; +pub const HashAndTime = struct { hash: Hash, timestamp: u64 }; +// TODO: benchmark other structs? +const PurgedQ = std.TailQueue(HashAndTime); +const FailedInsertsQ = std.TailQueue(HashAndTime); + /// Cluster Replicated Data Store: stores gossip data /// the self.store uses an AutoArrayHashMap which is a HashMap that also allows for /// indexing values (value = arrayhashmap[0]). This allows us to insert data @@ -46,16 +55,30 @@ pub const CrdsError = error{ /// are found, the entry with the largest wallclock time (newest) is stored. pub const CrdsTable = struct { store: AutoArrayHashMap(CrdsValueLabel, CrdsVersionedValue), + + // special types tracked with their index contact_infos: AutoArrayHashMap(usize, void), // hashset for O(1) insertion/removal - shred_versions: AutoHashMap(Pubkey, u16), votes: AutoArrayHashMap(usize, usize), epoch_slots: AutoArrayHashMap(usize, usize), duplicate_shreds: AutoArrayHashMap(usize, usize), - cursor: usize, + shred_versions: AutoHashMap(Pubkey, u16), + + // used to build pull responses efficiently + shards: CrdsShards, + + // used when sending pull requests + purged: PurgedQ, + failed_inserts: FailedInsertsQ, + + // head of the store + cursor: usize = 0, + + // thread safe + lock: RwLock = .{}, const Self = @This(); - pub fn init(allocator: std.mem.Allocator) Self { + pub fn init(allocator: std.mem.Allocator) !Self { return Self{ .store = AutoArrayHashMap(CrdsValueLabel, CrdsVersionedValue).init(allocator), .contact_infos = AutoArrayHashMap(usize, void).init(allocator), @@ -63,7 +86,9 @@ pub const CrdsTable = struct { .votes = AutoArrayHashMap(usize, usize).init(allocator), .epoch_slots = AutoArrayHashMap(usize, usize).init(allocator), .duplicate_shreds = AutoArrayHashMap(usize, usize).init(allocator), - .cursor = 0, + .shards = try CrdsShards.init(allocator), + .purged = PurgedQ{}, + .failed_inserts = FailedInsertsQ{}, }; } @@ -74,11 +99,33 @@ pub const CrdsTable = struct { self.votes.deinit(); self.epoch_slots.deinit(); self.duplicate_shreds.deinit(); + self.shards.deinit(); + } + + pub fn write(self: *Self) void { + self.lock.lock(); + } + + pub fn release_write(self: *Self) void { + self.lock.unlock(); } - pub fn insert(self: *Self, value: CrdsValue, now: u64) !void { + pub fn read(self: *Self) void { + self.lock.lockShared(); + } + + pub fn release_read(self: *Self) void { + self.lock.unlockShared(); + } + + pub fn len(self: *Self) usize { + return self.store.count(); + } + + pub fn insert(self: *Self, value: CrdsValue, now: u64, maybe_route: ?GossipRoute) !void { // TODO: check to make sure this sizing is correct or use heap - var buf = [_]u8{0} ** 2048; + + var buf = [_]u8{0} ** 2048; // does this break if its called in parallel? / dangle? var bytes = try bincode.writeToSlice(&buf, value, bincode.Params.standard); const value_hash = Hash.generateSha256Hash(bytes); const versioned_value = CrdsVersionedValue{ @@ -91,29 +138,33 @@ pub const CrdsTable = struct { const label = value.label(); var result = try self.store.getOrPut(label); + const entry_index = result.index; // entry doesnt exist if (!result.found_existing) { switch (value.data) { .LegacyContactInfo => |*info| { - try self.contact_infos.put(result.index, {}); + try self.contact_infos.put(entry_index, {}); try self.shred_versions.put(info.id, info.shred_version); }, .Vote => { - try self.votes.put(self.cursor, result.index); + try self.votes.put(self.cursor, entry_index); }, .EpochSlots => { - try self.epoch_slots.put(self.cursor, result.index); + try self.epoch_slots.put(self.cursor, entry_index); }, .DuplicateShred => { - try self.duplicate_shreds.put(self.cursor, result.index); + try self.duplicate_shreds.put(self.cursor, entry_index); }, else => {}, } - self.cursor += 1; + try self.shards.insert(entry_index, &versioned_value.value_hash); + result.value_ptr.* = versioned_value; + self.cursor += 1; + // should overwrite existing entry } else if (crds_overwrites(&versioned_value, result.value_ptr)) { const old_entry = result.value_ptr.*; @@ -125,34 +176,72 @@ pub const CrdsTable = struct { .Vote => { var did_remove = self.votes.swapRemove(old_entry.ordinal); std.debug.assert(did_remove); - try self.votes.put(self.cursor, result.index); + try self.votes.put(self.cursor, entry_index); }, .EpochSlots => { var did_remove = self.epoch_slots.swapRemove(old_entry.ordinal); std.debug.assert(did_remove); - try self.epoch_slots.put(self.cursor, result.index); + try self.epoch_slots.put(self.cursor, entry_index); }, .DuplicateShred => { var did_remove = self.duplicate_shreds.swapRemove(old_entry.ordinal); std.debug.assert(did_remove); - try self.duplicate_shreds.put(self.cursor, result.index); + try self.duplicate_shreds.put(self.cursor, entry_index); }, else => {}, } - self.cursor += 1; + // remove and insert to make sure the shard ordering is oldest-to-newest + // NOTE: do we need the ordering to be oldest-to-newest? + try self.shards.remove(entry_index, &old_entry.value_hash); + try self.shards.insert(entry_index, &versioned_value.value_hash); + + var node = PurgedQ.Node{ .data = HashAndTime{ + .hash = old_entry.value_hash, + .timestamp = now, + } }; + self.purged.append(&node); + result.value_ptr.* = versioned_value; + self.cursor += 1; + // do nothing } else { - return CrdsError.OldValue; + const old_entry = result.value_ptr.*; + + var node_data = HashAndTime{ + .hash = old_entry.value_hash, + .timestamp = now, + }; + + if (maybe_route) |route| { + if (route == GossipRoute.PullResponse) { + var failed_insert_node = FailedInsertsQ.Node{ .data = node_data }; + self.failed_inserts.append(&failed_insert_node); + } + } + + if (old_entry.value_hash.cmp(&versioned_value.value_hash) != CompareResult.Equal) { + // if hash isnt the same and override() is false then msg is old + var purged_node = PurgedQ.Node{ .data = node_data }; + self.purged.append(&purged_node); + + return CrdsError.OldValue; + } else { + // hash is the same then its a duplicate + return CrdsError.DuplicateValue; + } } } - pub fn get_votes_with_cursor(self: *Self, caller_cursor: *usize) ![]*CrdsVersionedValue { + // ** getter functions ** + pub fn get(self: *Self, label: CrdsValueLabel) ?CrdsVersionedValue { + return self.store.get(label); + } + + pub fn get_votes_with_cursor(self: *Self, buf: []*CrdsVersionedValue, caller_cursor: *usize) ![]*CrdsVersionedValue { const keys = self.votes.keys(); - // initialize this buffer once in struct and re-use on each call? - var buf: [MAX_N_VOTES]*CrdsVersionedValue = undefined; // max N votes per query (20) var index: usize = 0; for (keys) |key| { if (key < caller_cursor.*) { @@ -163,7 +252,7 @@ pub const CrdsTable = struct { buf[index] = &entry; index += 1; - if (index == MAX_N_VOTES) { + if (index == buf.len) { break; } } @@ -172,9 +261,8 @@ pub const CrdsTable = struct { return buf[0..index]; } - pub fn get_epoch_slots_with_cursor(self: *Self, caller_cursor: *usize) ![]*CrdsVersionedValue { + pub fn get_epoch_slots_with_cursor(self: *Self, buf: []*CrdsVersionedValue, caller_cursor: *usize) ![]*CrdsVersionedValue { const keys = self.epoch_slots.keys(); - var buf: [MAX_N_EPOCH_SLOTS]*CrdsVersionedValue = undefined; var index: usize = 0; for (keys) |key| { if (key < caller_cursor.*) { @@ -185,7 +273,7 @@ pub const CrdsTable = struct { buf[index] = &entry; index += 1; - if (index == MAX_N_EPOCH_SLOTS) { + if (index == buf.len) { break; } } @@ -194,9 +282,8 @@ pub const CrdsTable = struct { return buf[0..index]; } - pub fn get_duplicate_shreds_with_cursor(self: *Self, caller_cursor: *usize) ![]*CrdsVersionedValue { + pub fn get_duplicate_shreds_with_cursor(self: *Self, buf: []*CrdsVersionedValue, caller_cursor: *usize) ![]*CrdsVersionedValue { const keys = self.duplicate_shreds.keys(); - var buf: [MAX_N_DUP_SHREDS]*CrdsVersionedValue = undefined; var index: usize = 0; for (keys) |key| { if (key < caller_cursor.*) { @@ -207,7 +294,7 @@ pub const CrdsTable = struct { buf[index] = &entry; index += 1; - if (index == MAX_N_DUP_SHREDS) { + if (index == buf.len) { break; } } @@ -216,17 +303,86 @@ pub const CrdsTable = struct { return buf[0..index]; } - pub fn get_contact_infos(self: *const Self) ![]*CrdsVersionedValue { - var entry_ptrs: [MAX_N_CONTACT_INFOS]*CrdsVersionedValue = undefined; - const size = @min(self.contact_infos.count(), MAX_N_CONTACT_INFOS); + pub fn get_contact_infos(self: *const Self, buf: []*CrdsVersionedValue) ![]*CrdsVersionedValue { const store_values = self.store.iterator().values; const contact_indexs = self.contact_infos.iterator().keys; + + const size = @min(self.contact_infos.count(), buf.len); + for (0..size) |i| { const index = contact_indexs[i]; - const entry = &store_values[index]; - entry_ptrs[i] = entry; + const entry = &store_values[index]; // does this dangle? + buf[i] = entry; + } + return buf[0..size]; + } + + // ** shard getter fcns ** + pub fn get_bitmask_matches( + self: *const Self, + alloc: std.mem.Allocator, + mask: u64, + mask_bits: u64, + ) !std.ArrayList(usize) { + const indexs = try self.shards.find(alloc, mask, @intCast(mask_bits)); + return indexs; + } + + // ** purged values fcns ** + pub fn purged_len(self: *Self) usize { + return self.purged.len; + } + + pub fn get_purged_values(self: *Self, alloc: std.mem.Allocator) !std.ArrayList(Hash) { + var values = try std.ArrayList(Hash).initCapacity(alloc, self.purged.len); + + // collect all the hash values + var curr_ptr = self.purged.first; + while (curr_ptr) |curr| : (curr_ptr = curr.next) { + values.appendAssumeCapacity(curr.data.hash); + } + + return values; + } + + pub fn trim_purged_values(self: *Self, oldest_timestamp: u64) !void { + var curr_ptr = self.purged.first; + while (curr_ptr) |curr| : (curr_ptr = curr.next) { + const data_timestamp = curr.data.timestamp; + if (data_timestamp < oldest_timestamp) { + self.purged.remove(curr); + } else { + break; + } + } + } + + // ** failed insert values fcns ** + pub fn failed_inserts_len(self: *Self) usize { + return self.failed_inserts.len; + } + + pub fn get_failed_inserts_values(self: *Self, alloc: std.mem.Allocator) !std.ArrayList(Hash) { + var values = try std.ArrayList(Hash).initCapacity(alloc, self.failed_inserts.len); + + // collect all the hash values + var curr_ptr = self.failed_inserts.first; + while (curr_ptr) |curr| : (curr_ptr = curr.next) { + values.appendAssumeCapacity(curr.data.hash); + } + return values; + } + + pub fn trim_failed_inserts_values(self: *Self, oldest_timestamp: u64) !void { + var curr_ptr = self.failed_inserts.first; + while (curr_ptr) |curr| : (curr_ptr = curr.next) { + const data_timestamp = curr.data.timestamp; + if (data_timestamp < oldest_timestamp) { + self.failed_inserts.remove(curr); + } else { + break; + } } - return entry_ptrs[0..size]; } }; @@ -246,6 +402,25 @@ pub fn crds_overwrites(new_value: *const CrdsVersionedValue, old_value: *const C } } +test "gossip.crds_table: insert and get" { + const keypair = try KeyPair.create([_]u8{1} ** 32); + + var seed: u64 = @intCast(std.time.milliTimestamp()); + var rand = std.rand.DefaultPrng.init(seed); + const rng = rand.random(); + + var value = try CrdsValue.random(rng, keypair); + + var crds_table = try CrdsTable.init(std.testing.allocator); + defer crds_table.deinit(); + + try crds_table.insert(value, 0, null); + + const label = value.label(); + const x = crds_table.get(label).?; + _ = x; +} + test "gossip.crds_table: insert and get votes" { var kp_bytes = [_]u8{1} ** 32; const kp = try KeyPair.create(kp_bytes); @@ -257,75 +432,69 @@ test "gossip.crds_table: insert and get votes" { .Vote = .{ 0, vote }, }, kp); - var crds_table = CrdsTable.init(std.testing.allocator); + var crds_table = try CrdsTable.init(std.testing.allocator); defer crds_table.deinit(); - try crds_table.insert(crds_value, 0); + try crds_table.insert(crds_value, 0, null); var cursor: usize = 0; - var votes = try crds_table.get_votes_with_cursor(&cursor); + var buf: [100]*CrdsVersionedValue = undefined; + var votes = try crds_table.get_votes_with_cursor(&buf, &cursor); try std.testing.expect(votes.len == 1); try std.testing.expect(cursor == 1); // try inserting another vote - id = Pubkey.random(.{}); + var seed: u64 = @intCast(std.time.milliTimestamp()); + var rand = std.rand.DefaultPrng.init(seed); + const rng = rand.random(); + id = Pubkey.random(rng, .{}); vote = crds.Vote{ .from = id, .transaction = Transaction.default(), .wallclock = 10 }; crds_value = try CrdsValue.initSigned(CrdsData{ .Vote = .{ 0, vote }, }, kp); - try crds_table.insert(crds_value, 1); + try crds_table.insert(crds_value, 1, null); - votes = try crds_table.get_votes_with_cursor(&cursor); + votes = try crds_table.get_votes_with_cursor(&buf, &cursor); try std.testing.expect(votes.len == 1); try std.testing.expect(cursor == 2); + + const v = try crds_table.get_bitmask_matches(std.testing.allocator, 10, 1); + defer v.deinit(); } test "gossip.crds_table: insert and get contact_info" { - var kp_bytes = [_]u8{1} ** 32; - const kp = try KeyPair.create(kp_bytes); - const pk = kp.public_key; - var id = Pubkey.fromPublicKey(&pk, true); - const unspecified_addr = SocketAddr.unspecified(); - var legacy_contact_info = crds.LegacyContactInfo{ - .id = id, - .gossip = unspecified_addr, - .tvu = unspecified_addr, - .tvu_forwards = unspecified_addr, - .repair = unspecified_addr, - .tpu = unspecified_addr, - .tpu_forwards = unspecified_addr, - .tpu_vote = unspecified_addr, - .rpc = unspecified_addr, - .rpc_pubsub = unspecified_addr, - .serve_repair = unspecified_addr, - .wallclock = 0, - .shred_version = 0, - }; + const kp = try KeyPair.create([_]u8{1} ** 32); + var id = Pubkey.fromPublicKey(&kp.public_key, true); + + var legacy_contact_info = crds.LegacyContactInfo.default(); + legacy_contact_info.id = id; var crds_value = try CrdsValue.initSigned(CrdsData{ .LegacyContactInfo = legacy_contact_info, }, kp); - var crds_table = CrdsTable.init(std.testing.allocator); + var crds_table = try CrdsTable.init(std.testing.allocator); defer crds_table.deinit(); // test insertion - try crds_table.insert(crds_value, 0); + try crds_table.insert(crds_value, 0, null); // test retrieval - var nodes = try crds_table.get_contact_infos(); + var buf: [100]*CrdsVersionedValue = undefined; + var nodes = try crds_table.get_contact_infos(&buf); try std.testing.expect(nodes.len == 1); try std.testing.expect(nodes[0].value.data.LegacyContactInfo.id.equals(&id)); // test re-insertion - const result = crds_table.insert(crds_value, 0); - try std.testing.expectError(CrdsError.OldValue, result); + const result = crds_table.insert(crds_value, 0, null); + try std.testing.expectError(CrdsError.DuplicateValue, result); // test re-insertion with greater wallclock - crds_value.data.LegacyContactInfo.wallclock = 2; - try crds_table.insert(crds_value, 0); + crds_value.data.LegacyContactInfo.wallclock += 2; + const v = crds_value.data.LegacyContactInfo.wallclock; + try crds_table.insert(crds_value, 0, null); // check retrieval - nodes = try crds_table.get_contact_infos(); + nodes = try crds_table.get_contact_infos(&buf); try std.testing.expect(nodes.len == 1); - try std.testing.expect(nodes[0].value.data.LegacyContactInfo.wallclock == 2); + try std.testing.expect(nodes[0].value.data.LegacyContactInfo.wallclock == v); } diff --git a/src/gossip/gossip_service.zig b/src/gossip/gossip_service.zig index 8b328cd5e..0ef13f3cc 100644 --- a/src/gossip/gossip_service.zig +++ b/src/gossip/gossip_service.zig @@ -22,6 +22,10 @@ const _crds_table = @import("../gossip/crds_table.zig"); const CrdsTable = _crds_table.CrdsTable; const CrdsError = _crds_table.CrdsError; const Logger = @import("../trace/log.zig").Logger; +const GossipRoute = _crds_table.GossipRoute; + +const crds_pull_request = @import("../gossip/pull_request.zig"); +const crds_pull_response = @import("../gossip/pull_response.zig"); var gpa_allocator = std.heap.GeneralPurposeAllocator(.{}){}; var gpa = gpa_allocator.allocator(); @@ -29,7 +33,9 @@ var gpa = gpa_allocator.allocator(); const PacketChannel = Channel(Packet); // const ProtocolChannel = Channel(Protocol); +const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000; +const FAILED_INSERTS_RETENTION_MS: u64 = 20_000; pub const GossipService = struct { cluster_info: *ClusterInfo, @@ -38,6 +44,10 @@ pub const GossipService = struct { packet_channel: PacketChannel, responder_channel: PacketChannel, crds_table: CrdsTable, + allocator: std.mem.Allocator, + + pull_timeout: u64 = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, + push_msg_timeout: u64 = CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, const Self = @This(); @@ -46,10 +56,10 @@ pub const GossipService = struct { cluster_info: *ClusterInfo, gossip_socket: UdpSocket, exit: AtomicBool, - ) Self { + ) !Self { var packet_channel = PacketChannel.init(allocator, 10000); var responder_channel = PacketChannel.init(allocator, 10000); - var crds_table = CrdsTable.init(allocator); + var crds_table = try CrdsTable.init(allocator); return Self{ .cluster_info = cluster_info, @@ -58,6 +68,7 @@ pub const GossipService = struct { .packet_channel = packet_channel, .responder_channel = responder_channel, .crds_table = crds_table, + .allocator = allocator, }; } @@ -98,6 +109,24 @@ pub const GossipService = struct { try self.send_ping(&peer, logger); try self.push_contact_info(&peer); + // generate pull requests + var filters = crds_pull_request.build_crds_filters(self.allocator, &self.crds_table, crds_pull_request.MAX_BLOOM_SIZE) catch { + // TODO: handle this -- crds store not enough data? + std.time.sleep(std.time.ns_per_s * 1); + continue; + }; + defer crds_pull_request.deinit_crds_filters(&filters); + // TODO: send em out + + // trim CRDS values + const now = get_wallclock(); + // recently purged values + const purged_cutoff_timestamp = now -| (5 * self.pull_timeout); + try self.crds_table.trim_purged_values(purged_cutoff_timestamp); + // failed insertions + const failed_insert_cutoff_timestamp = now -| FAILED_INSERTS_RETENTION_MS; + try self.crds_table.trim_failed_inserts_values(failed_insert_cutoff_timestamp); + std.time.sleep(std.time.ns_per_s * 1); } } @@ -205,8 +234,32 @@ pub const GossipService = struct { }, .PushMessage => |*push| { logger.debugf("got a push message: {any}", .{protocol_message}); + // TODO: verify value matches pubkey const values = push[1]; - handle_push_message(crds_table, values, logger); + insert_crds_values(crds_table, values, logger, GossipRoute.PushMessage); + }, + .PullResponse => |*pull| { + logger.debugf("got a pull message: {any}", .{protocol_message}); + // TODO: verify value matches pubkey + const values = pull[1]; + insert_crds_values(crds_table, values, logger, GossipRoute.PullResponse); + }, + .PullRequest => |*pull| { + // TODO: verify value matches pubkey + var filter = pull[0]; + var value = pull[1]; // contact info + const now = get_wallclock(); + + const crds_values = try crds_pull_response.filter_crds_values( + allocator, + crds_table, + &value, + &filter, + 100, + now, + ); + // TODO: send them out as a pull response + _ = crds_values; }, else => { logger.debugf("got a protocol message: {any}", .{protocol_message}); @@ -215,9 +268,12 @@ pub const GossipService = struct { } } - pub fn handle_push_message(crds_table: *CrdsTable, values: []crds.CrdsValue, logger: *Logger) void { + pub fn insert_crds_values(crds_table: *CrdsTable, values: []crds.CrdsValue, logger: *Logger, route: GossipRoute) void { var now = get_wallclock(); + crds_table.write(); + defer crds_table.release_write(); + for (values) |value| { const value_time = value.wallclock(); const is_too_new = value_time > now +| CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS; @@ -226,7 +282,7 @@ pub const GossipService = struct { continue; } - crds_table.insert(value, now) catch |err| switch (err) { + crds_table.insert(value, now, route) catch |err| switch (err) { CrdsError.OldValue => { logger.debugf("failed to insert into crds: {any}", .{value}); }, @@ -240,7 +296,7 @@ pub const GossipService = struct { test "gossip.gossip_service: process contact_info push packet" { const allocator = std.testing.allocator; - var crds_table = CrdsTable.init(allocator); + var crds_table = try CrdsTable.init(allocator); defer crds_table.deinit(); var packet_channel = PacketChannel.init(allocator, 100); @@ -274,12 +330,12 @@ test "gossip.gossip_service: process contact_info push packet" { var buf = [_]u8{0} ** PACKET_DATA_SIZE; var bytes = try bincode.writeToSlice(buf[0..], msg, bincode.Params.standard); const packet = Packet.init(peer, buf, bytes.len); - packet_channel.send(packet); // correct insertion into table + var buf2: [100]*crds.CrdsVersionedValue = undefined; std.time.sleep(std.time.ns_per_s); - var res = try crds_table.get_contact_infos(); + var res = try crds_table.get_contact_infos(&buf2); try std.testing.expect(res.len == 1); packet_channel.close(); diff --git a/src/gossip/imgs/2023-08-07-15-49-02.png b/src/gossip/imgs/2023-08-07-15-49-02.png new file mode 100644 index 000000000..16f721ec1 Binary files /dev/null and b/src/gossip/imgs/2023-08-07-15-49-02.png differ diff --git a/src/gossip/imgs/2023-08-07-15-49-57.png b/src/gossip/imgs/2023-08-07-15-49-57.png new file mode 100644 index 000000000..37dd4dcd8 Binary files /dev/null and b/src/gossip/imgs/2023-08-07-15-49-57.png differ diff --git a/src/gossip/imgs/2023-08-07-17-15-29.png b/src/gossip/imgs/2023-08-07-17-15-29.png new file mode 100644 index 000000000..aacd64c10 Binary files /dev/null and b/src/gossip/imgs/2023-08-07-17-15-29.png differ diff --git a/src/gossip/imgs/2023-08-08-12-15-44.png b/src/gossip/imgs/2023-08-08-12-15-44.png new file mode 100644 index 000000000..dfc2966a2 Binary files /dev/null and b/src/gossip/imgs/2023-08-08-12-15-44.png differ diff --git a/src/gossip/imgs/2023-08-08-12-35-30.png b/src/gossip/imgs/2023-08-08-12-35-30.png new file mode 100644 index 000000000..322941105 Binary files /dev/null and b/src/gossip/imgs/2023-08-08-12-35-30.png differ diff --git a/src/gossip/imgs/2023-08-08-13-37-04.png b/src/gossip/imgs/2023-08-08-13-37-04.png new file mode 100644 index 000000000..778d404b0 Binary files /dev/null and b/src/gossip/imgs/2023-08-08-13-37-04.png differ diff --git a/src/gossip/imgs/2023-08-08-13-42-47.png b/src/gossip/imgs/2023-08-08-13-42-47.png new file mode 100644 index 000000000..94bab71ed Binary files /dev/null and b/src/gossip/imgs/2023-08-08-13-42-47.png differ diff --git a/src/gossip/net.zig b/src/gossip/net.zig index bc4581f79..81e06ff0d 100644 --- a/src/gossip/net.zig +++ b/src/gossip/net.zig @@ -16,6 +16,27 @@ pub const SocketAddr = union(enum(u8)) { }, }; + pub fn random(rng: std.rand.Random) Self { + const pport = rng.int(u16); + + var version = rng.int(u8); + if (version % 2 == 0) { + var octets: [4]u8 = undefined; + rng.bytes(&octets); + + return Self{ + .V4 = .{ .ip = Ipv4Addr.init(octets[0], octets[1], octets[2], octets[3]), .port = pport }, + }; + } else { + var octets: [16]u8 = undefined; + rng.bytes(&octets); + + return Self{ + .V6 = .{ .ip = Ipv6Addr.init(octets), .port = pport, .flowinfo = 0, .scope_id = 0 }, + }; + } + } + pub fn init_ipv4(octets: [4]u8, portt: u16) Self { return Self{ .V4 = .{ .ip = Ipv4Addr.init(octets[0], octets[1], octets[2], octets[3]), .port = portt }, @@ -177,3 +198,9 @@ pub const IpAddr = union(enum(u32)) { } } }; + +test "gossip.net: test random" { + var rng = std.rand.DefaultPrng.init(@intCast(std.time.milliTimestamp())); + var addr = SocketAddr.random(rng.random()); + _ = addr; +} diff --git a/src/gossip/node.zig b/src/gossip/node.zig index 9b384d4c0..b5cf78865 100644 --- a/src/gossip/node.zig +++ b/src/gossip/node.zig @@ -3,11 +3,14 @@ const Pubkey = @import("../core/pubkey.zig").Pubkey; const network = @import("zig-network"); const Version = @import("../version/version.zig").Version; const bincode = @import("../bincode/bincode.zig"); -const varint_config = @import("../utils/varint.zig").varint_config; + +const var_int = @import("../utils/varint.zig"); +const var_int_config_u16 = var_int.var_int_config_u16; +const var_int_config_u64 = var_int.var_int_config_u64; + const serialize_varint = @import("../utils/varint.zig").serilaize_varint; const deserialize_varint = @import("../utils/varint.zig").deserialize_varint; -const shortvec_config = @import("../utils/shortvec.zig").shortvec_config; -const ShortVecArrayListConfig = @import("../utils/shortvec_arraylist.zig").ShortVecArrayListConfig; +const ShortVecArrayListConfig = @import("../utils/shortvec.zig").ShortVecArrayListConfig; const SocketAddr = @import("net.zig").SocketAddr; const IpAddr = @import("net.zig").IpAddr; const gossip = @import("sig").gossip; @@ -52,10 +55,10 @@ pub const ContactInfo = struct { sockets: ArrayList(SocketEntry), cache: [SOCKET_CACHE_SIZE]SocketAddr = socket_addrs_unspecified(), - pub const @"!bincode-config:cache" = bincode.FieldConfig{ .skip = true }; + pub const @"!bincode-config:cache" = bincode.FieldConfig([SOCKET_CACHE_SIZE]SocketAddr){ .skip = true }; pub const @"!bincode-config:addrs" = ShortVecArrayListConfig(IpAddr); pub const @"!bincode-config:sockets" = ShortVecArrayListConfig(SocketEntry); - pub const @"!bincode-config:wallclock" = varint_config; + pub const @"!bincode-config:wallclock" = var_int_config_u64; const Self = @This(); @@ -242,7 +245,7 @@ pub const SocketEntry = struct { index: u8, // IpAddr index in the accompanying addrs vector. offset: u16, // Port offset with respect to the previous entry. - pub const @"!bincode-config:offset" = varint_config; + pub const @"!bincode-config:offset" = var_int_config_u16; const Self = @This(); @@ -282,7 +285,11 @@ fn socket_addrs_unspecified() [13]SocketAddr { const logger = std.log.scoped(.node_tests); test "new contact info" { - var ci = ContactInfo.init(testing.allocator, Pubkey.random(.{}), @as(u64, @intCast(std.time.microTimestamp())), 1000); + var seed: u64 = @intCast(std.time.milliTimestamp()); + var rand = std.rand.DefaultPrng.init(seed); + const rng = rand.random(); + + var ci = ContactInfo.init(testing.allocator, Pubkey.random(rng, .{}), @as(u64, @intCast(std.time.microTimestamp())), 1000); defer ci.deinit(); } @@ -307,7 +314,11 @@ test "socketaddr bincode serialize matches rust" { } test "set & get socket on contact info" { - var ci = ContactInfo.init(testing.allocator, Pubkey.random(.{}), @as(u64, @intCast(std.time.microTimestamp())), 1000); + var seed: u64 = @intCast(std.time.milliTimestamp()); + var rand = std.rand.DefaultPrng.init(seed); + const rng = rand.random(); + + var ci = ContactInfo.init(testing.allocator, Pubkey.random(rng, .{}), @as(u64, @intCast(std.time.microTimestamp())), 1000); defer ci.deinit(); try ci.setSocket(SOCKET_TAG_RPC, SocketAddr.init_ipv4(.{ 127, 0, 0, 1 }, 8899)); diff --git a/src/gossip/protocol.zig b/src/gossip/protocol.zig index c26e84adc..8be33efb2 100644 --- a/src/gossip/protocol.zig +++ b/src/gossip/protocol.zig @@ -9,11 +9,13 @@ const SocketAddr = @import("net.zig").SocketAddr; const crds = @import("crds.zig"); const CrdsValue = crds.CrdsValue; const CrdsData = crds.CrdsData; -const CrdsFilter = crds.CrdsFilter; const Version = crds.Version; const LegacyVersion2 = crds.LegacyVersion2; const LegacyContactInfo = crds.LegacyContactInfo; +const pull_import = @import("pull_request.zig"); +const CrdsFilter = pull_import.CrdsFilter; + const Option = @import("../option.zig").Option; const DefaultPrng = std.rand.DefaultPrng; const KeyPair = std.crypto.sign.Ed25519.KeyPair; diff --git a/src/gossip/pull_request.zig b/src/gossip/pull_request.zig new file mode 100644 index 000000000..2f7dcd03e --- /dev/null +++ b/src/gossip/pull_request.zig @@ -0,0 +1,322 @@ +const std = @import("std"); +const SocketAddr = @import("net.zig").SocketAddr; +const Tuple = std.meta.Tuple; +const Hash = @import("../core/hash.zig").Hash; +const Signature = @import("../core/signature.zig").Signature; +const Transaction = @import("../core/transaction.zig").Transaction; +const Slot = @import("../core/slot.zig").Slot; +const Option = @import("../option.zig").Option; +const ContactInfo = @import("node.zig").ContactInfo; +const bincode = @import("../bincode/bincode.zig"); +const ArrayList = std.ArrayList; +const ArrayListConfig = @import("../utils/arraylist.zig").ArrayListConfig; +const Bloom = @import("../bloom/bloom.zig").Bloom; +const KeyPair = std.crypto.sign.Ed25519.KeyPair; +const Pubkey = @import("../core/pubkey.zig").Pubkey; +const exp = std.math.exp; + +const CrdsTable = @import("crds_table.zig").CrdsTable; +const crds = @import("crds.zig"); +const CrdsValue = crds.CrdsValue; + +pub const MAX_CRDS_OBJECT_SIZE: usize = 928; +pub const MAX_BLOOM_SIZE: usize = MAX_CRDS_OBJECT_SIZE; + +pub const MAX_NUM_PULL_REQUESTS: usize = 20; // labs - 1024; +pub const FALSE_RATE: f64 = 0.1; +pub const KEYS: f64 = 8; + +/// parses all the values in the crds table and returns a list of +/// corresponding filters. Note: make sure to call deinit_crds_filters. +pub fn build_crds_filters( + alloc: std.mem.Allocator, + crds_table: *CrdsTable, + bloom_size: usize, +) !ArrayList(CrdsFilter) { + crds_table.read(); + defer crds_table.release_read(); + + const num_items = crds_table.len() + crds_table.purged_len() + crds_table.failed_inserts_len(); + + var filter_set = try CrdsFilterSet.init(alloc, num_items, bloom_size); + + // add all crds values + const crds_values = crds_table.store.iterator().values; + for (0..crds_table.len()) |i| { + const hash = crds_values[i].value_hash; + filter_set.add(&hash); + } + // add purged values + const purged_values = try crds_table.get_purged_values(alloc); + for (purged_values.items) |hash| { + filter_set.add(&hash); + } + // add failed inserts + const failed_inserts = try crds_table.get_failed_inserts_values(alloc); + for (failed_inserts.items) |hash| { + filter_set.add(&hash); + } + + // note: filter set is deinit() in this fcn + const filters = try filter_set.consume_for_crds_filters(alloc, MAX_NUM_PULL_REQUESTS); + return filters; +} + +pub fn deinit_crds_filters(filters: *ArrayList(CrdsFilter)) void { + for (filters.items) |*filter| { + filter.deinit(); + } + filters.deinit(); +} + +pub const CrdsFilterSet = struct { + filters: ArrayList(Bloom), + + // mask bits represents the number of bits required to represent the number of + // filters. + mask_bits: u32, // todo: make this a u6 + + const Self = @This(); + + pub fn init(alloc: std.mem.Allocator, num_items: usize, bloom_size_bytes: usize) !Self { + var bloom_size_bits: f64 = @floatFromInt(bloom_size_bytes * 8); + // mask_bits = log2(..) number of filters + var mask_bits = CrdsFilter.compute_mask_bits(@floatFromInt(num_items), bloom_size_bits); + std.debug.assert(mask_bits > 0); + + const n_filters: usize = @intCast(@as(u64, 1) << @as(u6, @intCast(mask_bits))); + std.debug.assert(n_filters > 0); + + // TODO; add errdefer handling here + var max_items = CrdsFilter.compute_max_items(bloom_size_bits, FALSE_RATE, KEYS); + var filters = try ArrayList(Bloom).initCapacity(alloc, n_filters); + for (0..n_filters) |_| { + var filter = try Bloom.random(alloc, @intFromFloat(max_items), FALSE_RATE, @intFromFloat(bloom_size_bits)); + filters.appendAssumeCapacity(filter); + } + + return Self{ + .filters = filters, + .mask_bits = mask_bits, + }; + } + + /// note: does not free filter values bc we take ownership of them in + /// getCrdsFilters + pub fn deinit(self: *Self) void { + self.filters.deinit(); + } + + pub fn hash_index(mask_bits: u32, hash: *const Hash) usize { + // 64 = u64 bits + const shift_bits: u6 = @intCast(64 - mask_bits); + // only look at the first `mask_bits` bits + // which represents `n_filters` number of indexs + const index = @as(usize, hash_to_u64(hash) >> shift_bits); + return index; + } + + pub fn add(self: *Self, hash: *const Hash) void { + const index = CrdsFilterSet.hash_index(self.mask_bits, hash); + self.filters.items[index].add(&hash.data); + } + + pub fn len(self: Self) usize { + return self.filters.items.len; + } + + /// returns a list of CrdsFilters and consumes Self by calling deinit. + pub fn consume_for_crds_filters(self: *Self, alloc: std.mem.Allocator, max_size: usize) !ArrayList(CrdsFilter) { + defer self.deinit(); // ! + + const set_size = self.len(); + var indexs = try ArrayList(usize).initCapacity(alloc, set_size); + defer indexs.deinit(); + + for (0..set_size) |i| { + indexs.appendAssumeCapacity(i); + } + + const output_size = @min(set_size, max_size); + const can_consume_all = max_size >= set_size; + + if (!can_consume_all) { + // shuffle the indexs + var seed = @as(u64, @intCast(std.time.milliTimestamp())); + var rand = std.rand.DefaultPrng.init(seed); + for (0..output_size) |i| { + const j = @min(set_size, @max(0, rand.random().int(usize))); + std.mem.swap(usize, &indexs.items[i], &indexs.items[j]); + } + + // release others + for (output_size..set_size) |i| { + const index = indexs.items[i]; + self.filters.items[index].deinit(); + } + } + + var output = try ArrayList(CrdsFilter).initCapacity(alloc, output_size); + for (0..output_size) |i| { + const index = indexs.items[i]; + + var output_item = CrdsFilter{ + .filter = self.filters.items[index], // take ownership of filter + .mask = CrdsFilter.compute_mask(index, self.mask_bits), + .mask_bits = self.mask_bits, + }; + output.appendAssumeCapacity(output_item); + } + + return output; + } +}; + +pub const CrdsFilter = struct { + filter: Bloom, + mask: u64, + mask_bits: u32, + + const Self = @This(); + + /// only used in tests + pub fn init(allocator: std.mem.Allocator) Self { + return Self{ + .filter = Bloom.init(allocator, 0, null), + .mask = 18_446_744_073_709_551_615, + .mask_bits = 0, + }; + } + + pub fn compute_mask(index: u64, mask_bits: u32) u64 { + std.debug.assert(mask_bits > 0); + std.debug.assert(index <= std.math.pow(u64, 2, mask_bits)); + // eg, with index = 2 and mask_bits = 3 + // shift_bits = 61 (ie, only look at first 2 bits) + const shift_bits: u6 = @intCast(64 - mask_bits); + // 2 << 61 = 100...000 + const shifted_index = index << shift_bits; + // OR with all the other zeros + // 10 111111..11111 + // ^ ^ + // index (mask_bits length) | rest + return shifted_index | (~@as(u64, 0) >> @as(u6, @intCast(mask_bits))); + } + + pub fn compute_mask_bits(num_items: f64, max: f64) u32 { + return @intFromFloat(@max(0, (std.math.ceil(std.math.log2(num_items / max))))); + } + + pub fn compute_max_items(max_bits: f64, false_rate: f64, num_keys: f64) f64 { + const m = max_bits; + const p = false_rate; + const k = num_keys; + return std.math.ceil(m / (-k / @log(@as(f64, 1) - exp(@log(p) / k)))); + } + + pub fn deinit(self: *Self) void { + self.filter.deinit(); + } +}; + +pub fn hash_to_u64(hash: *const Hash) u64 { + const buf = hash.data[0..8]; + return std.mem.readIntLittle(u64, buf); +} + +test "gossip.pull: test build_crds_filters" { + var crds_table = try CrdsTable.init(std.testing.allocator); + defer crds_table.deinit(); + + // insert a some value + const kp = try KeyPair.create([_]u8{1} ** 32); + + var seed: u64 = @intCast(std.time.milliTimestamp()); + var rand = std.rand.DefaultPrng.init(seed); + const rng = rand.random(); + + for (0..64) |_| { + var id = Pubkey.random(rng, .{}); + var legacy_contact_info = crds.LegacyContactInfo.default(); + legacy_contact_info.id = id; + var crds_value = try crds.CrdsValue.initSigned(crds.CrdsData{ + .LegacyContactInfo = legacy_contact_info, + }, kp); + + try crds_table.insert(crds_value, 0, null); + } + + const max_bytes = 2; + const num_items = crds_table.len(); + + // build filters + var filters = try build_crds_filters(std.testing.allocator, &crds_table, max_bytes); + defer deinit_crds_filters(&filters); + + const mask_bits = filters.items[0].mask_bits; + + // assert value is in the filter + const crds_values = crds_table.store.iterator().values; + for (0..num_items) |i| { + const versioned_value = crds_values[i]; + const hash = versioned_value.value_hash; + + const index = CrdsFilterSet.hash_index(mask_bits, &hash); + const filter = filters.items[index].filter; + try std.testing.expect(filter.contains(&hash.data)); + } +} + +test "gossip.pull: CrdsFilterSet deinits correct" { + var filter_set = try CrdsFilterSet.init(std.testing.allocator, 10000, 200); + + const hash = Hash.random(); + filter_set.add(&hash); + + const index = CrdsFilterSet.hash_index(filter_set.mask_bits, &hash); + var bloom = filter_set.filters.items[index]; + + const v = bloom.contains(&hash.data); + try std.testing.expect(v); + + var f = try filter_set.consume_for_crds_filters(std.testing.allocator, 10); + defer deinit_crds_filters(&f); + + try std.testing.expect(f.capacity == filter_set.len()); + + const x = f.items[index]; + try std.testing.expect(x.filter.contains(&hash.data)); +} + +test "gossip.pull: helper functions are correct" { + { + const v = CrdsFilter.compute_max_items(100.5, 0.1, 10.0); + try std.testing.expectEqual(@as(f64, 16), v); + } + + { + const v = CrdsFilter.compute_mask_bits(800, 100); + try std.testing.expectEqual(@as(usize, 3), v); + } + + { + const v = Hash{ .data = .{1} ++ .{0} ** 31 }; + try std.testing.expectEqual(@as(u64, 1), hash_to_u64(&v)); + } + + { + const v = CrdsFilter.compute_mask(2, 3); + // 101111111111111111111111111111111111111111111111111111111111111 + try std.testing.expectEqual(@as(u64, 6917529027641081855), v); + } +} + +test "gossip.pull: crds filter matches rust bytes" { + const rust_bytes = [_]u8{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 0 }; + var filter = CrdsFilter.init(std.testing.allocator); + defer filter.deinit(); + + var buf = [_]u8{0} ** 1024; + var bytes = try bincode.writeToSlice(buf[0..], filter, bincode.Params.standard); + try std.testing.expectEqualSlices(u8, rust_bytes[0..], bytes); +} diff --git a/src/gossip/pull_response.zig b/src/gossip/pull_response.zig new file mode 100644 index 000000000..e5d5df460 --- /dev/null +++ b/src/gossip/pull_response.zig @@ -0,0 +1,147 @@ +const std = @import("std"); +const SocketAddr = @import("net.zig").SocketAddr; +const Tuple = std.meta.Tuple; +const Hash = @import("../core/hash.zig").Hash; +const Signature = @import("../core/signature.zig").Signature; +const Transaction = @import("../core/transaction.zig").Transaction; +const Slot = @import("../core/slot.zig").Slot; +const Option = @import("../option.zig").Option; +const ContactInfo = @import("node.zig").ContactInfo; +const bincode = @import("../bincode/bincode.zig"); +const ArrayList = std.ArrayList; +const ArrayListConfig = @import("../utils/arraylist.zig").ArrayListConfig; +const Bloom = @import("../bloom/bloom.zig").Bloom; +const KeyPair = std.crypto.sign.Ed25519.KeyPair; +const Pubkey = @import("../core/pubkey.zig").Pubkey; +const exp = std.math.exp; + +const CrdsTable = @import("crds_table.zig").CrdsTable; +const crds = @import("crds.zig"); +const CrdsValue = crds.CrdsValue; + +const crds_pull_req = @import("./pull_request.zig"); +const CrdsFilter = crds_pull_req.CrdsFilter; + +pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; + +// TODO: make it batch +pub fn filter_crds_values( + alloc: std.mem.Allocator, + crds_table: *CrdsTable, + value: *CrdsValue, + filter: *CrdsFilter, + output_size_limit: usize, + now: u64, +) !ArrayList(CrdsValue) { + crds_table.read(); + defer crds_table.release_read(); + + if (output_size_limit == 0) { + return ArrayList(CrdsValue).init(alloc); + } + + var caller_wallclock = value.wallclock(); + const is_too_old = caller_wallclock < now -| CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; + const is_too_new = caller_wallclock > now +| CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; + if (is_too_old or is_too_new) { + return ArrayList(CrdsValue).init(alloc); + } + + var seed: u64 = @intCast(std.time.milliTimestamp()); + var rand = std.rand.DefaultPrng.init(seed); + const rng = rand.random(); + + const jitter = rng.intRangeAtMost(u64, 0, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 4); + caller_wallclock = caller_wallclock + jitter; + + var output = ArrayList(CrdsValue).init(alloc); + var bloom = filter.filter; + + var match_indexs = try crds_table.get_bitmask_matches(alloc, filter.mask, filter.mask_bits); + defer match_indexs.deinit(); + + for (match_indexs.items) |entry_index| { + var entry = crds_table.store.iterator().values[entry_index]; + + // entry is too new + if (entry.value.wallclock() > caller_wallclock) { + continue; + } + // entry is already contained in the bloom + if (bloom.contains(&entry.value_hash.data)) { + continue; + } + // exclude contact info (? not sure why - labs does it) + if (entry.value.data == crds.CrdsData.ContactInfo) { + continue; + } + + // good + try output.append(entry.value); + if (output.items.len == output_size_limit) { + break; + } + } + + return output; +} + +test "gossip.pull: test filter_crds_values" { + var crds_table = try CrdsTable.init(std.testing.allocator); + defer crds_table.deinit(); + + // insert a some value + const kp = try KeyPair.create([_]u8{1} ** 32); + + var seed: u64 = @intCast(std.time.milliTimestamp()); + var rand = std.rand.DefaultPrng.init(seed); + const rng = rand.random(); + + for (0..100) |_| { + // var id = Pubkey.random(rng, .{}); + // var legacy_contact_info = crds.LegacyContactInfo.default(); + // legacy_contact_info.wallclock = 40; + // legacy_contact_info.id = id; + // var crds_value = try crds.CrdsValue.initSigned(crds.CrdsData{ + // .LegacyContactInfo = legacy_contact_info, + // }, kp); + + var crds_value = try crds.CrdsValue.random(rng, kp); + try crds_table.insert(crds_value, 0, null); + } + + const max_bytes = 10; + + // recver + var filters = try crds_pull_req.build_crds_filters(std.testing.allocator, &crds_table, max_bytes); + defer crds_pull_req.deinit_crds_filters(&filters); + var filter = filters.items[0]; + + // corresponding value + const pk = kp.public_key; + var id = Pubkey.fromPublicKey(&pk, true); + var legacy_contact_info = crds.LegacyContactInfo.default(); + legacy_contact_info.id = id; + legacy_contact_info.wallclock = @intCast(std.time.milliTimestamp()); + var crds_value = try CrdsValue.initSigned(crds.CrdsData{ + .LegacyContactInfo = legacy_contact_info, + }, kp); + + // insert more values which the filters should be missing + for (0..64) |_| { + var v2 = try crds.CrdsValue.random(rng, kp); + try crds_table.insert(v2, 0, null); + } + + var values = try filter_crds_values( + std.testing.allocator, + &crds_table, + &crds_value, + &filter, + 100, + @intCast(std.time.milliTimestamp()), + ); + defer values.deinit(); + + try std.testing.expect(values.items.len > 0); +} diff --git a/src/gossip/readme.md b/src/gossip/readme.md index 63f10a0db..9b2d07d00 100644 --- a/src/gossip/readme.md +++ b/src/gossip/readme.md @@ -9,3 +9,338 @@ This function is called and `spawn`ed and is a long running process. It's where ### `ClusterInfo::run_listen` - Listens for Packet's to process This function `spawn`ed and is a long running process. It listens to the packet receiver channel and then processes packets as they're pushed. + +## File outline + +- `crds_table.zig`: where gossip data is stored +- `crds.zig`: various gossip data structure definitions +- `pull_request.zig`: logic for sending pull *requests* +- `pull_response.zig`: logic for sending pull *responses* (/handling incoming pull requests) +- `crds_shards.zig`: datastructure which stores gossip data hashes for quick lookup - used in `crds_table` and constructing pull responses + +## Datatypes and Datastructures + +- data types which we track are defined in `crds.zig` which include `CrdsData` and `CrdsValue` + - a `CrdsData` is an enum over the possible gossip data types + - a `CrdsValue` contains a `CrdsData` struct and a signature of the `CrdsData` - these are propogated on the network and their signature is verified before processing their data. + +- there are many `CrdsData` types used throughout the codebase, but ones of particular importance include: + - `ContactInfo`/`LegacyContactInfo`: which includes node specific information such as the nodes public key and socket addresses for specific validator tasks (including gossip, tvu, tpu, repair, ...). This structure is critical for discovering the rest of the network. + - `Vote`: which includes a validators signature that a specific block is valid. Note, this data is slowly being phased out of the gossip protocol because its not required and takes up a lot of network bandwidth. + - `EpochSlots`: // TODO + - `DuplicateShred`: // TODO + +### CrdsTable: Storing Gossip Data + +- gossip data is stored in a Conflict-free Replicated Data Store (CRDS) located in `crds_table.zig`. +- to store this data we use an indexable HashMap which uses a `CrdsValueLabel` as +its keys and a `CrdsVersionedValue` as its values + +
+ +
+ +- new crds data comes from push messages and pull responses + +### ValueLabels and VersionedValues + +- each `CrdsData` type has a corresponding `CrdsValueLabel` which defines how the data is stored/replaced +- for example a `LegacyContactInfo` struct includes many socket address fields, however, its corresponding label is only its pubkey + +```zig= +// the full contact info struct (including pubkeys, sockets, and more) +pub const LegacyContactInfo = struct { + id: Pubkey, + /// gossip address + gossip: SocketAddr, + /// address to connect to for replication + tvu: SocketAddr, + /// address to forward shreds to + tvu_forwards: SocketAddr, + /// address to send repair responses to + repair: SocketAddr, + /// transactions address + tpu: SocketAddr, + //... +} + +// the corresponding label (only the Pubkey) +pub const CrdsValueLabel = union(enum) { + LegacyContactInfo: Pubkey, + //... +} +``` + +- assuming each validator corresponds to one pubkey, this means we'll only store one `ContactInfo` per validator. +- when inserting a `CrdsData` whos label already exist in the table, we keep the one with the largest wallclock time (ie, the newest) and discard the other. + +### Storing Specific Data Types + +- were also interested in storing specific datatypes in the table efficiently +- for example, when broadcasting data to the rest of the network, it would be nice to have all the `ContactInfo` values which are stored in the CRDS table +- this is why we use an **indexable** hash map implementation + + - for example, when inserting values into the table, we recieve its corresponding index from the insertion (`crds_index = crds_table.insert(&versioned_value)`) + - we can then store these indexs in an array (`contact_infos.append(crds_index)`) + - to retrieve these values, we can iterate over the array, index into the table, and retrieve the correspoinding data values (`versioned_value = crds_table[crds_index]`) + - we follow this approach for all of the data types such as: `ContactInfos`, `Votes`, `EpochSlots`, `DuplicateShreds`, and `ShredVersions` + +### Retrieving Data Specific Data Types + +- to efficiently retrieve *new* data from the table, we also track a `cursor` variable which is the head of the table and is monotonically incremented on each insert/update +- we can then use getter functions such as, `get_votes_with_cursor`, which allows you to retrieve votes which are past a certain cursor index +- a listener would track their own cursor and periodically call the getter functions to retrieve new values + +## Protocol Messages: Pull + +### Building Pull *Requests* + +- pull request are used to retrieve gossip data which the node is missing - they are sent to random peers periodically and say, "i have these values in my crds table, can you send me anything im missing" +- to say this, we construct a bloom filter over the hashes of values stored in the crds table + - the majority of code can be found in `pull_requests.zig` and `src/bloom/bloom.zig` + +- since there are a lot of values in the crds table, instead of constructing one large bloom filter to send to all validators, we partition the crds data across multiple filters based on the hash value's first `N` bits + - we do this with the `CrdsFilterSet` struct which is a list of `CrdsFilters` +- for example, if we are paritioning on the first 3 bits of hash values we would use, 2^3 = 8 `Bloom` filters: + - the first bloom containing hash values whos bits start with 000 + - the second bloom containing hash values whos bits start with 001 + - ... + - and lastly, the eight bloom containing hash values whos bits start with 111 +- for example, if we were tracking a `Hash` with bits 00101110101, we would only consider its first 3 bits, 001, and so we would add the hash to the first bloom filter (`@cast(usize, 001) = 1`) +- you can think of this as a custom hash function for a hash map, where the keys are `Hash` values, the values are `Bloom` filters, and the function is to consider the first `N` bits of the hash +- the first `N` bits is called the `mask_bits` in the code which depends on many factors including the desired false-positive rate of the bloom filters, the number of items in the crds table, and more +- after we construct this filter set (ie, compute the `mask_bits` and init `2^mask_bits` bloom filters), we then need to track all of the `CrdsValues` to it +- some psuedocode is below + +
+ +
+ +```python +## main function for building pull requests +def build_crds_filters( + crds_table: *CrdsTable +) Vec: + + values = crds_table.values() + filter_set = CrdsFilterSet.init(len(value)) + + for value in values: + filter_set.add(value) + + # CrdsFilterSet => Vec + return filter_set.consumeForCrdsFilters() + +class CrdsFilterSet(): + mask_bits: u64 + filters: Vec + + def init(self, num_items): + self.mask_bits = ... # compute the mask_bits + n_filters = 1 << mask_bits # 2^mask_bits + + self.filters = [] + for i in 0..n_filters: + self.filters.append(Bloom.random()) + + def add(hash: Hash): + # compute the hash index (ie, the first mask_bits bits of the Hash) + # eg: + # hash: 001010101010101..1 + # mask_bits = 3 + # shift_bits = 64 - 3 (note: u64 has 64 bits) + # hash >> shift_bits = 001 (first three bits) = index 1 + # == filters[1].add(hash) + shift_bits = 64 - mask_bits + index = @as(usize, hash >> shift_bits) + self.filters[index].add(hash) +``` + +- after adding all values to the `CrdsFilterSet`, we then need to consume the set into a vector of `CrdsFilters` which we'll send to different peers +- to idenitfy which hash bits each filter contains, we use a mask + - eg, the mask of the first filter would be `000`, the mask of the second filter would be `001`, the third filter would be `010`, ... + - when a node recieves a pull request, the mask will be used to efficiently find all the values whos hash matches the mask (ie, if you recieved the `010` mask, you would look up all hash values whose first 3 bits are `010`) and then find values which are not included in the bloom filter + +```python + def consumeForCrdsFilters(self: CrdsFilterSet) Vec: + for index in 0..len(self.filters): + crds_filter = CrdsFilter( + bloom=self.filters[index], + mask=CrdsFilter.compute_mask(index, self.mask_bits), + mask_bits=self.mask_bits, + ) +``` + +- the logic follows similar to the bit operations above and is computed in `CrdsFilter.compute_mask(index, self.mask_bits)` + +```python +fn compute_mask(index: u64, mask_bits: u64) u64: + # shift the index to the first `mask_bits` of the u64 + # eg, + # index = 1 + # mask_bits = 3 + # shift_bits = 64 - 3 (note: u64 has 64 bits) + shift_bits = 64 - mask_bits + # shifted_index = 1 << (64 - 3) = 001000000000000...0 + shifted_index = index << shift_bits + # ones = 000111111111111..1 + ones = (~@as(u64, 0) >> @as(u6, @intCast(mask_bits))) + # result = 001111111111111..1 + return shifted_index | ones; +``` +- notice how the result will be ones everywhere except for the first `mask_bits` bits, which represent the index +- after getting the vector of filters, we send each filter out to a random peer weighted by stake weight + +### Building Pull *Responses* + +- sending a pull *response* requires you to iterate over values stored in the Crds table, filter the values to match the `CrdsFilter`'s `mask`, and find values which are not included in the request's `Bloom` filter +- the main function which we use is `filter_crds_values` which takes a `CrdsFilter` as input and returns a vector of `CrdsValues` + - first it calls `crds_table.get_bitmask_matches` which returns the entries in the crds table which match the filters `mask` + - to do this efficiently, we introduce a new data structure called `CrdsShards` which is located in `crds_shards.zig` + +
+ +
+ +#### `CrdsShards` + +- `CrdsShards` stores hash values efficiently based on the first `shard_bits` of a hash value (similar to the `CrdsFilterSet` structure) +- the main structure is `shards = [4096]AutoArrayHashMap(usize, u64),` where `shards[k]` includes crds values which the first `shard_bits` of their hash value is equal to `k` + - `usize` is the index of the value in the crds table + - and `u64` is the hash value represented as a `u64` + - also note that `shard_bits` is hardcoded in the program as `12`, so we will have 2^12 = 4096 shard indexs + - this allows us to quickly look up all the crds values whos hash matches a pull requests `mask` (compared to iterating over all the crds values) + +- whenever we insert a new value in the `CrdsTable`, we insert its hash value into the `CrdsShard` structure and so the struct is stored on the `CrdsTable` +- the insertion logic is straightforward + - take the first 8 bytes of a hash and cast it to a `u64` (`hash_u64 = @as(u64, hash[0..8])`) + - compute the first `shard_bits` bits of the `u64` by computing `shard_index = hash_u64 >> (64 - shard_bits)` + - get the shard: `self.shards[shard_index]` + - insert the crds table index along with the `u64_hash` + +```python +def insert(self: *CrdsShards, crds_index: usize, hash: *const Hash): + shard_index = @as(u64, hash[0..8]) >> (64 - shard_bits) + shard = self.shard[shard_index] + shard.put(crds_index, uhash); +``` + +
+ +
+ +#### `CrdsShards`: Finding hash matches + +- now to build the pull response, we need to retrieve hash values which match a `mask` (ie, their first `mask_bit` bits are equal to `mask`) +- when `shard_bits == mask_bits` its very straightforward, we just lookup the shard corresponding to the first `shard_bits` of `mask` and return its values + +
+ +
+ +```python +def find_matches(self: *CrdsShards, mask: u64, mask_bits: u64) Vec: + if (self.shard_bits == mask_bits) { + shard = self.shard[(mask >> (64 - self.shard_bits)] + crds_indexs = shard.keys() + return crds_indexs + } else { + # TODO: + } +``` + +- when `shard_bits < mask_bits`, the mask is tracking more bits than the shards are, so we know if we truncate the mask up to `shard_bits`, we can grab the shard and iterate over those values and to find exact matches + - truncating and looking up the shard gives us hashes which have a matching first `shard_bits` + - we then need to check to make sure the last `shard_bits - mask_bits` match the mask which we do through iteration + +
+ +
+ +```python +def find_matches(self: *CrdsShards, mask: u64, mask_bits: u64) Vec: + # ones everywhere except for the first `mask_bits` + mask_ones = (~0 >> mask_bits) + + if (self.shard_bits == mask_bits) { + # ... + } else if (self.shard_bits < mask_bits) { + # truncate the mask + shard_index = mask << (64 - self.shard_bits) + shard = self.shards[shard_index] + + # scan for matches + crds_indexs = [] + for (indexs, hash_u64) in shard: + if ((hash_u64 | mask_ones) == (mask | mask_ones)): # match! + crds_indexs.append(indexs) + return crds_indexs + + } else { + # TODO + } +``` + +- when `shard_bits > mask_bits` the shards is tracking more information than the mask, so we'll need to lookup multiple shards to find all the values which match `mask` +- for example, + - if shard_bits = 4 and mask_bits = 2 and our mask is 01 + - the possible shards well need to lookup are: 0100, 0101, 0110, 0111 + - ie, there will be 4 shards that match the mask represented by the difference in bits + - so, we know we'll have to look up `2^(shard_bits - mask_bits)` number of shards which can be computed using `count = 1 << (shard_bits - mask_bits)` + - the final shard would be the mask followed by all ones (ie, 0111 in the example above) at the end which can be computed as `end = (mask | mask_ones) >> shard_bits` + - since we know the final shard and the number of shards were looking for, we can iterate over them from `index = (end-count)..end` + +
+ +
+ +```python +def find_matches(self: *CrdsShards, mask: u64, mask_bits: u64) Vec: + # ones everywhere except for the first `mask_bits` + mask_ones = (~0 >> mask_bits) + + if (self.shard_bits == mask_bits) { + # ... + } else if (self.shard_bits < mask_bits) { + # ... + } else if (self.shard_bits > mask_bits) { + shift_bits = self.shard_bits - mask_bits + count = 1 << shift_bits + end = (mask | mask_ones) >> shard_bits + + crds_indexs = [] + for shard_index in (end-count)..end: + shard = self.shards[shard_index] + indexs = shard.keys() + crds_indexs.append(indexs) + + return crds_indexs + } +``` + +- after we have all the crds indexs which match the next check is check which values are not included in the request's bloom filter +- this is fairly straightforward +- the psuedo code looks like + +```python + +def filter_crds_values( + crds_table: *CrdsTable + filter: *CrdsFilter +) Vec: + var match_indexs = crds_table.get_bitmask_matches(filter.mask, filter.mask_bits); + + values = [] + for index in match_indexs: + entry = crds_table[index] + if (!filter.bloom.contains(entry.hash)): + values.append(entry) + + return values +``` + +## Push Requests + +## Ping/Pong + +## Prune Messages diff --git a/src/lib.zig b/src/lib.zig index 62415294e..b87c096b1 100644 --- a/src/lib.zig +++ b/src/lib.zig @@ -24,6 +24,9 @@ pub const gossip = struct { pub usingnamespace @import("gossip/node.zig"); pub usingnamespace @import("gossip/packet.zig"); pub usingnamespace @import("gossip/protocol.zig"); + pub usingnamespace @import("gossip/pull_request.zig"); + pub usingnamespace @import("gossip/pull_response.zig"); + pub usingnamespace @import("gossip/crds_shards.zig"); }; pub const bloom = struct { diff --git a/src/rpc/client.zig b/src/rpc/client.zig index 1c20797b6..824ce6829 100644 --- a/src/rpc/client.zig +++ b/src/rpc/client.zig @@ -1328,8 +1328,12 @@ test "pubkey equality works" { } test "pubkey randome works" { - var pubkey = Pubkey.random(.{ .seed = 20 }); - var pubkey_2 = Pubkey.random(.{ .seed = 19 }); + var seed: u64 = @intCast(std.time.milliTimestamp()); + var rand = std.rand.DefaultPrng.init(seed); + const rng = rand.random(); + + var pubkey = Pubkey.random(rng, .{}); + var pubkey_2 = Pubkey.random(rng, .{}); try testing.expect(!pubkey_2.equals(&pubkey)); } diff --git a/src/utils/arraylist.zig b/src/utils/arraylist.zig index 455f38787..04658e5c0 100644 --- a/src/utils/arraylist.zig +++ b/src/utils/arraylist.zig @@ -1,7 +1,7 @@ const std = @import("std"); const bincode = @import("../bincode/bincode.zig"); -pub fn ArrayListConfig(comptime Child: type) bincode.FieldConfig { +pub fn ArrayListConfig(comptime Child: type) bincode.FieldConfig(std.ArrayList(Child)) { const S = struct { pub fn serialize(writer: anytype, data: anytype, params: bincode.Params) !void { var list: std.ArrayList(Child) = data; @@ -12,7 +12,7 @@ pub fn ArrayListConfig(comptime Child: type) bincode.FieldConfig { return; } - pub fn deserialize(allocator: ?std.mem.Allocator, comptime T: type, reader: anytype, params: bincode.Params) !T { + pub fn deserialize(allocator: ?std.mem.Allocator, reader: anytype, params: bincode.Params) !std.ArrayList(Child) { var ally = allocator.?; var len = try bincode.read(ally, u64, reader, params); var list = try std.ArrayList(Child).initCapacity(ally, @as(usize, len)); @@ -24,7 +24,7 @@ pub fn ArrayListConfig(comptime Child: type) bincode.FieldConfig { } }; - return bincode.FieldConfig{ + return bincode.FieldConfig(std.ArrayList(Child)){ .serializer = S.serialize, .deserializer = S.deserialize, }; diff --git a/src/utils/shortvec.zig b/src/utils/shortvec.zig index 11db94cde..619ab0da9 100644 --- a/src/utils/shortvec.zig +++ b/src/utils/shortvec.zig @@ -4,57 +4,60 @@ const network = @import("zig-network"); const serialize_short_u16 = @import("varint.zig").serialize_short_u16; const deserialize_short_u16 = @import("varint.zig").deserialize_short_u16; -pub fn ShortVecConfig(comptime childSerialize: bincode.SerializeFunction, comptime childDeserialize: bincode.DeserializeFunction) bincode.FieldConfig { +pub fn ShortVecConfig(comptime Child: type) bincode.FieldConfig([]Child) { const S = struct { pub fn serialize(writer: anytype, data: anytype, params: bincode.Params) !void { var len: u16 = std.math.cast(u16, data.len) orelse return error.DataTooLarge; try serialize_short_u16(writer, len, params); for (data) |item| { - try childSerialize(writer, item, params); + try bincode.write(null, writer, item, params); } - return; } - pub fn deserialize(allocator: ?std.mem.Allocator, comptime T: type, reader: anytype, params: bincode.Params) !void { + pub fn deserialize(allocator: ?std.mem.Allocator, reader: anytype, params: bincode.Params) ![]Child { var ally = allocator.?; - var len = try deserialize_short_u16(ally, u16, reader, params); - var elems = try ally.alloc(T.child, len); + var len = try deserialize_short_u16(reader, params); + var elems = try ally.alloc(Child, len); for (0..len) |i| { - elems[i] = childDeserialize(ally, T.child, reader, params); + elems[i] = try bincode.read(ally, Child, reader, params); } return elems; } }; - return bincode.FieldConfig{ + return bincode.FieldConfig([]Child){ .serializer = S.serialize, .deserializer = S.deserialize, }; } -pub const shortvec_config = bincode.FieldConfig{ - .serializer = serilaize_shortvec, - .deserializer = deserialize_shortvec, -}; - -pub fn serilaize_shortvec(writer: anytype, data: anytype, params: bincode.Params) !void { - var len = std.math.cast(u16, data.len) orelse return error.DataTooLarge; - try serialize_short_u16(writer, len, params); - for (data) |item| { - try bincode.write(null, writer, item, params); - } - return; -} +pub fn ShortVecArrayListConfig(comptime Child: type) bincode.FieldConfig(std.ArrayList(Child)) { + const S = struct { + pub fn serialize(writer: anytype, data: anytype, params: bincode.Params) !void { + var list: std.ArrayList(Child) = data; + var len = std.math.cast(u16, list.items.len) orelse return error.DataTooLarge; + try serialize_short_u16(writer, len, params); + for (list.items) |item| { + try bincode.write(null, writer, item, params); + } + } + + pub fn deserialize(allocator: ?std.mem.Allocator, reader: anytype, params: bincode.Params) !std.ArrayList(Child) { + var ally = allocator.?; -pub fn deserialize_shortvec(allocator: ?std.mem.Allocator, comptime T: type, reader: anytype, params: bincode.Params) !T { - var ally = allocator.?; + var len = try deserialize_short_u16(reader, params); + var list = try std.ArrayList(Child).initCapacity(ally, @as(usize, len)); + for (0..len) |_| { + var item = try bincode.read(ally, Child, reader, params); + try list.append(item); + } + return list; + } + }; - const Child = @typeInfo(T).Pointer.child; - var len = try deserialize_short_u16(ally, u16, reader, params); - var elems = try ally.alloc(Child, len); - for (0..len) |i| { - elems[i] = try bincode.read(ally, Child, reader, params); - } - return elems; + return bincode.FieldConfig(std.ArrayList(Child)){ + .serializer = S.serialize, + .deserializer = S.deserialize, + }; } diff --git a/src/utils/shortvec_arraylist.zig b/src/utils/shortvec_arraylist.zig deleted file mode 100644 index 01fcc3d31..000000000 --- a/src/utils/shortvec_arraylist.zig +++ /dev/null @@ -1,34 +0,0 @@ -const std = @import("std"); -const bincode = @import("../bincode/bincode.zig"); -const serialize_short_u16 = @import("varint.zig").serialize_short_u16; -const deserialize_short_u16 = @import("varint.zig").deserialize_short_u16; - -pub fn ShortVecArrayListConfig(comptime Child: type) bincode.FieldConfig { - const S = struct { - pub fn serialize(writer: anytype, data: anytype, params: bincode.Params) !void { - var list: std.ArrayList(Child) = data; - var len = std.math.cast(u16, list.items.len) orelse return error.DataTooLarge; - try serialize_short_u16(writer, len, params); - for (list.items) |item| { - try bincode.write(null, writer, item, params); - } - return; - } - - pub fn deserialize(allocator: ?std.mem.Allocator, comptime T: type, reader: anytype, params: bincode.Params) !T { - var ally = allocator.?; - var len = try deserialize_short_u16(ally, u16, reader, params); - var list = try std.ArrayList(Child).initCapacity(ally, @as(usize, len)); - for (0..len) |_| { - var item = try bincode.read(ally, Child, reader, params); - try list.append(item); - } - return list; - } - }; - - return bincode.FieldConfig{ - .serializer = S.serialize, - .deserializer = S.deserialize, - }; -} diff --git a/src/utils/varint.zig b/src/utils/varint.zig index 839d7c30e..51a2ba28b 100644 --- a/src/utils/varint.zig +++ b/src/utils/varint.zig @@ -1,65 +1,75 @@ const std = @import("std"); const bincode = @import("../bincode/bincode.zig"); -pub const varint_config = bincode.FieldConfig{ - .serializer = serilaize_varint, - .deserializer = deserialize_varint, -}; - -pub fn serilaize_varint(writer: anytype, data: anytype, _: bincode.Params) !void { - var v = data; - while (v >= 0x80) { - var byte = @as(u8, @intCast(((v & 0x7F) | 0x80))); - v >>= 7; - try writer.writeByte(byte); - } - try writer.writeByte(@as(u8, @intCast(v))); - return; -} - -pub fn deserialize_varint(_: ?std.mem.Allocator, comptime T: type, reader: anytype, _: bincode.Params) !T { - var out: T = 0; +pub fn VarIntConfig(comptime VarInt: type) bincode.FieldConfig(VarInt) { + const S = struct { + pub fn serialize(writer: anytype, data: anytype, params: bincode.Params) !void { + _ = params; + var v = data; + while (v >= 0x80) { + var byte = @as(u8, @intCast(((v & 0x7F) | 0x80))); + v >>= 7; + try writer.writeByte(byte); + } + try writer.writeByte(@as(u8, @intCast(v))); + } - var t_bits: u8 = switch (T) { - u16 => 16, - u32 => 32, - u64 => 64, - else => { - return error.InvalidType; - }, + pub fn deserialize(allocator: ?std.mem.Allocator, reader: anytype, params: bincode.Params) !VarInt { + _ = params; + _ = allocator; + + var out: VarInt = 0; + + var t_bits: u8 = switch (VarInt) { + u16 => 16, + u32 => 32, + u64 => 64, + else => { + return error.InvalidType; + }, + }; + + const ShiftT: type = switch (VarInt) { + u16 => u4, + u32 => u5, + u64 => u6, + else => unreachable, + }; + + var shift: u32 = 0; + while (shift < t_bits) { + const byte: u8 = try reader.readByte(); + out |= @as(VarInt, @intCast((byte & 0x7F))) << @as(ShiftT, @intCast(shift)); + if (byte & 0x80 == 0) { + if (@as(u8, @intCast(out >> @as(ShiftT, @intCast(shift)))) != byte) { + return error.TruncatedLastByte; + } + if (byte == 0 and (shift != 0 or out != 0)) { + return error.NotValidTrailingZeros; + } + return out; + } + shift += 7; + } + return error.ShiftOverflows; + } }; - const ShiftT: type = switch (T) { - u16 => u4, - u32 => u5, - u64 => u6, - else => unreachable, + return bincode.FieldConfig(VarInt){ + .serializer = S.serialize, + .deserializer = S.deserialize, }; - - var shift: u32 = 0; - while (shift < t_bits) { - const byte: u8 = try reader.readByte(); - out |= @as(T, @intCast((byte & 0x7F))) << @as(ShiftT, @intCast(shift)); - if (byte & 0x80 == 0) { - if (@as(u8, @intCast(out >> @as(ShiftT, @intCast(shift)))) != byte) { - return error.TruncatedLastByte; - } - if (byte == 0 and (shift != 0 or out != 0)) { - return error.NotValidTrailingZeros; - } - return out; - } - shift += 7; - } - return error.ShiftOverflows; } -pub fn serialize_short_u16(writer: anytype, data: anytype, _: bincode.Params) !void { - var val: u16 = data; +pub const var_int_config_u16 = VarIntConfig(u16); +pub const var_int_config_u64 = VarIntConfig(u64); + +pub fn serialize_short_u16(writer: anytype, data: u16, _: bincode.Params) !void { + var value = data; while (true) { - var elem = @as(u8, @intCast((val & 0x7f))); - val >>= 7; - if (val == 0) { + var elem = @as(u8, @intCast((value & 0x7f))); + value >>= 7; + if (value == 0) { try writer.writeByte(elem); break; } else { @@ -69,7 +79,7 @@ pub fn serialize_short_u16(writer: anytype, data: anytype, _: bincode.Params) !v } } -pub fn deserialize_short_u16(_: ?std.mem.Allocator, comptime T: type, reader: anytype, _: bincode.Params) !T { +pub fn deserialize_short_u16(reader: anytype, _: bincode.Params) !u16 { var val: u16 = 0; for (0..MAX_ENCODING_LENGTH) |n| { var elem: u8 = try reader.readByte(); diff --git a/src/version/version.zig b/src/version/version.zig index ccd08b2e9..aa4acd1c4 100644 --- a/src/version/version.zig +++ b/src/version/version.zig @@ -1,4 +1,4 @@ -const varint_config = @import("../utils/varint.zig").varint_config; +const var_int_config_u16 = @import("../utils/varint.zig").var_int_config_u16; pub const CURRENT_VERSION: Version = Version.new(0, 1, 0, 0, 0, 4); @@ -34,8 +34,8 @@ pub const Version = struct { }; } - pub const @"!bincode-config:major" = varint_config; - pub const @"!bincode-config:minor" = varint_config; - pub const @"!bincode-config:patch" = varint_config; - pub const @"!bincode-config:client" = varint_config; + pub const @"!bincode-config:major" = var_int_config_u16; + pub const @"!bincode-config:minor" = var_int_config_u16; + pub const @"!bincode-config:patch" = var_int_config_u16; + pub const @"!bincode-config:client" = var_int_config_u16; }; diff --git a/todo.md b/todo.md deleted file mode 100644 index 731ef6f63..000000000 --- a/todo.md +++ /dev/null @@ -1,7 +0,0 @@ -## Todos / known issues: - -- [] look into `Pubkey.random()` panicing: - -``` -run test: error: thread 4706882 panic: written is not 44, written: 43, dest: { 117, 53, 76, 103, 105, 78, 111, 65, 77, 80, 88, 85, 52, 72, 77, 84, 68, 57, 75, 70, 57, 51, 86, 68, 110, 102, 111, 70, 53, 51, 111, 54, 119, 74, 84, 70, 86, 67, 49, 105, 69, 104, 86, 0 }, bytes: { 13, 87, 27, 207, 173, 29, 155, 207, 223, 224, 42, 52, 95, 175, 3, 2, 104, 111, 218, 33, 99, 90, 163, 24, 102, 43, 120, 128, 237, 218, 136, 72 } -```