From bee9622dcbd13e62a92df40be86fa1fcd13bd321 Mon Sep 17 00:00:00 2001 From: x19 <100000306+0xNineteen@users.noreply.github.com> Date: Thu, 26 Sep 2024 17:16:50 -0400 Subject: [PATCH] feat(gossip): improved metrics (#276) improves metrics for inserting gossip values into the gossip table from push messages and pull requests this pr also includes a few bug fixes: - grafana: now uses the correct delta on metrics (was prev using incorrect idelta) - table: prev a full table was not allowing updating of existing labels, this changes it - table: when gossip data was updated, we were not free-ing the old overwritten data, this fixes it - cmd: fixes the predefined mainnet entrypoints - pull_requests: dont send requests to peers which have a different shred version than us --- .../grafana/dashboards/gossip_metrics.json | 362 ++++++++++++++++-- src/cmd/cmd.zig | 10 +- src/gossip/fuzz_table.zig | 26 +- src/gossip/service.zig | 191 +++++---- src/gossip/table.zig | 253 ++++-------- 5 files changed, 540 insertions(+), 302 deletions(-) diff --git a/metrics/grafana/dashboards/gossip_metrics.json b/metrics/grafana/dashboards/gossip_metrics.json index 9977b2a1d..5494cb3f2 100644 --- a/metrics/grafana/dashboards/gossip_metrics.json +++ b/metrics/grafana/dashboards/gossip_metrics.json @@ -24,7 +24,7 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, - "id": 1, + "id": 3, "links": [], "liveNow": false, "panels": [ @@ -44,6 +44,7 @@ "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, + "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 0, "gradientMode": "none", @@ -173,6 +174,7 @@ "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, + "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 0, "gradientMode": "none", @@ -267,6 +269,7 @@ }, { "datasource": { + "default": true, "type": "prometheus" }, "fieldConfig": { @@ -281,6 +284,7 @@ "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, + "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 0, "gradientMode": "none", @@ -347,7 +351,7 @@ "datasource": {}, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(ping_messages_dropped[$__interval])", + "expr": "delta(ping_messages_dropped[$__interval])", "fullMetaSearch": false, "includeNullMetadata": true, "legendFormat": "ping_messages_dropped", @@ -359,7 +363,7 @@ "datasource": {}, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(pull_requests_dropped[$__interval])", + "expr": "delta(pull_requests_dropped[$__interval])", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, @@ -372,7 +376,7 @@ "datasource": {}, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(prune_messages_dropped[$__interval])", + "expr": "delta(prune_messages_dropped[$__interval])", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, @@ -387,6 +391,7 @@ }, { "datasource": { + "default": true, "type": "prometheus" }, "fieldConfig": { @@ -458,14 +463,14 @@ "sort": "none" }, "xTickLabelRotation": 0, - "xTickLabelSpacing": 0 + "xTickLabelSpacing": 100 }, "targets": [ { "datasource": {}, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(pull_requests_recv[$__interval])", + "expr": "delta(pull_requests_recv[$__interval])", "fullMetaSearch": false, "includeNullMetadata": true, "legendFormat": "pull_requests_recv", @@ -477,7 +482,7 @@ "datasource": {}, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(pull_responses_recv[$__interval])", + "expr": "delta(pull_responses_recv[$__interval])", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, @@ -490,7 +495,7 @@ "datasource": {}, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(prune_messages_recv[$__interval])", + "expr": "delta(prune_messages_recv[$__interval])", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, @@ -503,7 +508,7 @@ "datasource": {}, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(push_messages_recv[$__interval])", + "expr": "delta(push_messages_recv[$__interval])", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, @@ -516,7 +521,7 @@ "datasource": {}, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(ping_messages_recv[$__interval])", + "expr": "delta(ping_messages_recv[$__interval])", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, @@ -529,7 +534,7 @@ "datasource": {}, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(pong_messages_recv[$__interval])", + "expr": "delta(pong_messages_recv[$__interval])", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, @@ -544,6 +549,7 @@ }, { "datasource": { + "default": true, "type": "prometheus" }, "fieldConfig": { @@ -615,7 +621,7 @@ "sort": "none" }, "xTickLabelRotation": 0, - "xTickLabelSpacing": 0 + "xTickLabelSpacing": 100 }, "pluginVersion": "10.4.1", "targets": [ @@ -623,7 +629,7 @@ "datasource": {}, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(ping_messages_sent[$__interval])", + "expr": "delta(ping_messages_sent[$__interval])", "fullMetaSearch": false, "includeNullMetadata": true, "legendFormat": "ping_messages_sent", @@ -635,7 +641,7 @@ "datasource": {}, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(pong_messages_sent[$__interval])", + "expr": "delta(pong_messages_sent[$__interval])", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, @@ -648,7 +654,7 @@ "datasource": {}, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(prune_messages_sent[$__interval])", + "expr": "delta(prune_messages_sent[$__interval])", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, @@ -661,7 +667,7 @@ "datasource": {}, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(pull_requests_sent[$__interval])", + "expr": "delta(pull_requests_sent[$__interval])", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, @@ -674,7 +680,7 @@ "datasource": {}, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(pull_responses_sent[$__interval])", + "expr": "delta(pull_responses_sent[$__interval])", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, @@ -687,7 +693,7 @@ "datasource": {}, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(push_messages_sent[$__interval])", + "expr": "delta(push_messages_sent[$__interval])", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, @@ -716,6 +722,7 @@ "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, + "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 0, "gradientMode": "none", @@ -756,7 +763,33 @@ ] } }, - "overrides": [] + "overrides": [ + { + "__systemRef": "hideSeriesFrom", + "matcher": { + "id": "byNames", + "options": { + "mode": "exclude", + "names": [ + "table_pubkeys_dropped", + "table_old_values_removed" + ], + "prefix": "All except:", + "readOnly": true + } + }, + "properties": [ + { + "id": "custom.hideFrom", + "value": { + "legend": false, + "tooltip": false, + "viz": true + } + } + ] + } + ] }, "gridPos": { "h": 8, @@ -861,6 +894,7 @@ "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, + "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 0, "gradientMode": "none", @@ -955,6 +989,7 @@ }, { "datasource": { + "default": true, "type": "prometheus" }, "fieldConfig": { @@ -969,6 +1004,7 @@ "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, + "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 0, "gradientMode": "none", @@ -999,7 +1035,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -1036,11 +1073,12 @@ }, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(push_message_n_values[$__interval])", + "expr": "push_message_n_invalid_shred_version", "fullMetaSearch": false, + "hide": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "n_values", + "legendFormat": "invalid_shred_version", "range": true, "refId": "A", "useBackend": false @@ -1051,12 +1089,12 @@ }, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(push_message_n_failed_inserts[$__interval])", + "expr": "push_message_n_new_inserts", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "n_failed_inserts", + "legendFormat": "new_inserts", "range": true, "refId": "B", "useBackend": false @@ -1067,15 +1105,63 @@ }, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(push_message_n_invalid_values[$__interval])", + "expr": "push_message_n_overwrite_existing", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "n_invalid_values", + "legendFormat": "overwrites", "range": true, "refId": "C", "useBackend": false + }, + { + "datasource": { + "type": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "push_message_n_old_value", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "old_values", + "range": true, + "refId": "D", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "push_message_n_duplicate_value", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "duplicate_values", + "range": true, + "refId": "E", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "push_message_n_timeouts", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "timeouts", + "range": true, + "refId": "F", + "useBackend": false } ], "title": "Push Message Processing Info", @@ -1083,6 +1169,7 @@ }, { "datasource": { + "default": true, "type": "prometheus" }, "fieldConfig": { @@ -1097,6 +1184,7 @@ "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, + "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 0, "gradientMode": "none", @@ -1127,7 +1215,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -1136,7 +1225,36 @@ ] } }, - "overrides": [] + "overrides": [ + { + "__systemRef": "hideSeriesFrom", + "matcher": { + "id": "byNames", + "options": { + "mode": "exclude", + "names": [ + "new_inserts", + "overwrites", + "old_values", + "duplicate_values", + "timeouts" + ], + "prefix": "All except:", + "readOnly": true + } + }, + "properties": [ + { + "id": "custom.hideFrom", + "value": { + "legend": false, + "tooltip": false, + "viz": true + } + } + ] + } + ] }, "gridPos": { "h": 8, @@ -1144,6 +1262,186 @@ "x": 12, "y": 24 }, + "id": 17, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "pull_response_n_invalid_shred_version", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "invalid_shred_version", + "range": true, + "refId": "A", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "pull_response_n_new_inserts", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "new_inserts", + "range": true, + "refId": "B", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "pull_response_n_overwrite_existing", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "overwrites", + "range": true, + "refId": "C", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "pull_response_n_old_value", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "old_values", + "range": true, + "refId": "D", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "pull_response_n_duplicate_value", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "duplicate_values", + "range": true, + "refId": "E", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "pull_response_n_duplicate_value", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "timeouts", + "range": true, + "refId": "F", + "useBackend": false + } + ], + "title": "Pull Response Processing Info", + "type": "timeseries" + }, + { + "datasource": { + "default": true, + "type": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 32 + }, "id": 14, "options": { "legend": { @@ -1164,7 +1462,7 @@ }, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(push_messages_time_to_insert[$__interval])", + "expr": "delta(push_messages_time_to_insert[$__interval])", "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, @@ -1180,7 +1478,7 @@ }, "disableTextWrap": false, "editorMode": "builder", - "expr": "idelta(push_messages_time_build_prune[$__interval])", + "expr": "delta(push_messages_time_build_prune[$__interval])", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, @@ -1203,13 +1501,13 @@ "list": [] }, "time": { - "from": "now-15m", + "from": "now-5m", "to": "now" }, "timepicker": {}, "timezone": "", "title": "Gossip Metrics", "uid": "jBuN47BVz", - "version": 13, + "version": 5, "weekStart": "" } \ No newline at end of file diff --git a/src/cmd/cmd.zig b/src/cmd/cmd.zig index 3a9babab1..34c5bd1f1 100644 --- a/src/cmd/cmd.zig +++ b/src/cmd/cmd.zig @@ -1217,15 +1217,15 @@ pub const Network = enum { switch (self) { .mainnet => { - predefined_entrypoints[len] = try E.fromSlice("entrypoint.mainnet.solana.com:8001"); + predefined_entrypoints[len] = try E.fromSlice("entrypoint.mainnet-beta.solana.com:8001"); len += 1; - predefined_entrypoints[len] = try E.fromSlice("entrypoint2.mainnet.solana.com:8001"); + predefined_entrypoints[len] = try E.fromSlice("entrypoint2.mainnet-beta.solana.com:8001"); len += 1; - predefined_entrypoints[len] = try E.fromSlice("entrypoint3.mainnet.solana.com:8001"); + predefined_entrypoints[len] = try E.fromSlice("entrypoint3.mainnet-beta.solana.com:8001"); len += 1; - predefined_entrypoints[len] = try E.fromSlice("entrypoint4.mainnet.solana.com:8001"); + predefined_entrypoints[len] = try E.fromSlice("entrypoint4.mainnet-beta.solana.com:8001"); len += 1; - predefined_entrypoints[len] = try E.fromSlice("entrypoint5.mainnet.solana.com:8001"); + predefined_entrypoints[len] = try E.fromSlice("entrypoint5.mainnet-beta.solana.com:8001"); len += 1; }, .testnet => { diff --git a/src/gossip/fuzz_table.zig b/src/gossip/fuzz_table.zig index 7935d1cb1..6608bb67c 100644 --- a/src/gossip/fuzz_table.zig +++ b/src/gossip/fuzz_table.zig @@ -118,8 +118,8 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void { // ! logger.debugf("putting pubkey: {}", .{pubkey}); - const did_insert = try gossip_table.insert(signed_data, now); - std.debug.assert(did_insert); + const result = try gossip_table.insert(signed_data, now); + std.debug.assert(result.wasInserted()); try keys.append(GossipKey{ .ContactInfo = pubkey }); try keypairs.append(keypair); @@ -150,20 +150,14 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void { } // ! - const did_insert = gossip_table.insert(signed_data, now) catch |err| blk: { - switch (err) { - GossipTable.InsertionError.OldValue => { - std.debug.assert(!should_overwrite); - }, - GossipTable.InsertionError.DuplicateValue => { - logger.debugf("duplicate value: {}", .{pubkey}); - }, - else => { - return err; - }, - } - break :blk false; - }; + const result = try gossip_table.insert(signed_data, now); + const did_insert = result.wasInserted(); + if (result == .IgnoredOldValue) { + std.debug.assert(!should_overwrite); + } + if (result == .IgnoredDuplicateValue) { + logger.debugf("duplicate value: {}", .{pubkey}); + } if (!should_overwrite and did_insert) { return error.ValueDidNotOverwrite; diff --git a/src/gossip/service.zig b/src/gossip/service.zig index 37b39a194..57f315582 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -66,6 +66,7 @@ const GossipMessageWithEndpoint = struct { from_endpoint: EndPoint, message: Gos pub const PULL_REQUEST_RATE = Duration.fromSecs(5); pub const PULL_RESPONSE_TIMEOUT = Duration.fromSecs(5); pub const ACTIVE_SET_REFRESH_RATE = Duration.fromSecs(15); +pub const DATA_TIMEOUT = Duration.fromSecs(15); pub const TABLE_TRIM_RATE = Duration.fromSecs(10); pub const BUILD_MESSAGE_LOOP_MIN = Duration.fromSecs(1); pub const PUBLISH_STATS_INTERVAL = Duration.fromSecs(2); @@ -91,7 +92,7 @@ pub const PING_CACHE_TTL = Duration.fromSecs(1280); pub const PING_CACHE_RATE_LIMIT_DELAY = Duration.fromSecs(1280 / 64); // TODO: replace with get_epoch_duration when BankForks is supported -const DEFAULT_EPOCH_DURATION: u64 = 172800000; +const DEFAULT_EPOCH_DURATION = Duration.fromMillis(172_800_000); pub const VERIFY_PACKET_PARALLEL_TASKS = 4; @@ -750,9 +751,6 @@ pub const GossipService = struct { defer gossip_table_lock.unlock(); const should_trim = gossip_table.shouldTrim(UNIQUE_PUBKEY_CAPACITY); - // NOTE: this counts as a trim attempt - self.stats.table_trim_call_count.inc(); - break :blk should_trim; }; @@ -1077,9 +1075,8 @@ pub const GossipService = struct { // filter out peers who have responded to pings const ping_cache_result = blk: { - var ping_cache_lock = self.ping_cache_rw.write(); - defer ping_cache_lock.unlock(); - var ping_cache: *PingCache = ping_cache_lock.mut(); + var ping_cache, var ping_cache_lg = self.ping_cache_rw.writeWithLock(); + defer ping_cache_lg.unlock(); const result = try ping_cache.filterValidPeers(self.allocator, self.my_keypair, peers); break :blk result; @@ -1134,16 +1131,20 @@ pub const GossipService = struct { .LegacyContactInfo = LegacyContactInfo.fromContactInfo(&self.my_contact_info), }, &self.my_keypair); + const my_shred_version = self.my_contact_info.shred_version; if (num_peers != 0) { for (filters.items) |filter_i| { // TODO: incorperate stake weight in random sampling const peer_index = rand.intRangeAtMost(usize, 0, num_peers - 1); const peer_contact_info_index = valid_gossip_peer_indexs.items[peer_index]; const peer_contact_info = peers[peer_contact_info_index]; + if (peer_contact_info.shred_version != my_shred_version) { + continue; + } if (peer_contact_info.gossip_addr) |gossip_addr| { const message = GossipMessage{ .PullRequest = .{ filter_i, my_contact_info_value } }; - var packet = &packet_batch.items[packet_index]; + const bytes = try bincode.writeToSlice(&packet.data, message, bincode.Params{}); packet.size = bytes.len; packet.addr = gossip_addr.toEndpoint(); @@ -1157,7 +1158,6 @@ pub const GossipService = struct { const entrypoint = self.entrypoints.items[@as(usize, @intCast(entrypoint_index))]; for (filters.items) |filter| { const message = GossipMessage{ .PullRequest = .{ filter, my_contact_info_value } }; - var packet = &packet_batch.items[packet_index]; const bytes = try bincode.writeToSlice(&packet.data, message, bincode.Params{}); packet.size = bytes.len; @@ -1413,51 +1413,63 @@ pub const GossipService = struct { defer failed_insert_ptrs.deinit(); { - var gossip_table_lock = self.gossip_table_rw.write(); - var gossip_table: *GossipTable = gossip_table_lock.mut(); - defer gossip_table_lock.unlock(); + var gossip_table, var gossip_table_lg = self.gossip_table_rw.writeWithLock(); + defer gossip_table_lg.unlock(); for (pull_response_messages.items) |*pull_message| { + const full_len = pull_message.gossip_values.len; const valid_len = self.filterBasedOnShredVersion( gossip_table, pull_message.gossip_values, pull_message.from_pubkey.*, ); + const invalid_shred_count = full_len - valid_len; const insert_results = try gossip_table.insertValues( now, pull_message.gossip_values[0..valid_len], PULL_RESPONSE_TIMEOUT.asMillis(), - true, - true, ); - - // silently insert the timeout values - // (without updating all associated origin values) - const timeout_indexs = insert_results.timeouts.?; - defer timeout_indexs.deinit(); - for (timeout_indexs.items) |index| { - _ = gossip_table.insert( - pull_message.gossip_values[index], - now, - ) catch {}; + defer insert_results.deinit(); + + for (insert_results.items) |result| { + switch (result) { + .InsertedNewEntry => self.stats.pull_response_n_new_inserts.inc(), + .OverwroteExistingEntry => self.stats.pull_response_n_overwrite_existing.inc(), + .IgnoredOldValue => self.stats.pull_response_n_old_value.inc(), + .IgnoredDuplicateValue => self.stats.pull_response_n_duplicate_value.inc(), + .IgnoredTimeout => self.stats.pull_response_n_timeouts.inc(), + .GossipTableFull => {}, + } } - - // update the contactInfo timestamps of the successful inserts - // (and all other origin values) - const successful_insert_indexs = insert_results.inserted.?; - defer successful_insert_indexs.deinit(); - for (successful_insert_indexs.items) |index| { - const origin = pull_message.gossip_values[index].id(); - gossip_table.updateRecordTimestamp(origin, now); + self.stats.pull_response_n_invalid_shred_version.add(invalid_shred_count); + + for (insert_results.items, 0..) |result, index| { + if (result.wasInserted()) { + // update the contactInfo (and all other origin values) timestamps of + // successful inserts + const origin = pull_message.gossip_values[index].id(); + gossip_table.updateRecordTimestamp(origin, now); + + switch (result) { + .OverwroteExistingEntry => |old_data| { + // if the value was overwritten, we need to free the old value + bincode.free(self.gossip_value_allocator, old_data); + }, + else => {}, + } + } else if (result == .IgnoredTimeout) { + // silently insert the timeout values + // (without updating all associated origin values) + _ = try gossip_table.insert( + pull_message.gossip_values[index], + now, + ); + } else { + try failed_insert_ptrs.append(&pull_message.gossip_values[index]); + } } gossip_table.updateRecordTimestamp(pull_message.from_pubkey.*, now); - - var failed_insert_indexs = insert_results.failed.?; - defer failed_insert_indexs.deinit(); - for (failed_insert_indexs.items) |index| { - try failed_insert_ptrs.append(&pull_message.gossip_values[index]); - } } } @@ -1531,47 +1543,57 @@ pub const GossipService = struct { for (batch_push_messages.items) |*push_message| { max_inserts_per_push = @max(max_inserts_per_push, push_message.gossip_values.len); } - var failed_insert_indexs = try std.ArrayList(usize) - .initCapacity(self.allocator, max_inserts_per_push); - defer failed_insert_indexs.deinit(); + var insert_results = try std.ArrayList(GossipTable.InsertResult).initCapacity(self.allocator, max_inserts_per_push); + defer insert_results.deinit(); // insert values and track the failed origins per pubkey { var timer = try sig.time.Timer.start(); - - var gossip_table_lock = self.gossip_table_rw.write(); - var gossip_table: *GossipTable = gossip_table_lock.mut(); - defer { - gossip_table_lock.unlock(); - const elapsed = timer.read().asMillis(); self.stats.push_messages_time_to_insert.observe(@floatFromInt(elapsed)); } - var n_gossip_data: usize = 0; - var n_failed_inserts: usize = 0; - var n_invalid_data: usize = 0; + var gossip_table, var gossip_table_lg = self.gossip_table_rw.writeWithLock(); + defer gossip_table_lg.unlock(); const now = getWallclockMs(); for (batch_push_messages.items) |*push_message| { - n_gossip_data += push_message.gossip_values.len; - // Filtered values are freed + const full_len = push_message.gossip_values.len; const valid_len = self.filterBasedOnShredVersion( gossip_table, push_message.gossip_values, push_message.from_pubkey.*, ); - n_invalid_data += push_message.gossip_values.len - valid_len; + const invalid_shred_count = full_len - valid_len; - try gossip_table.insertValuesMinAllocs( + try gossip_table.insertValuesWithResults( now, push_message.gossip_values[0..valid_len], PUSH_MSG_TIMEOUT.asMillis(), - &failed_insert_indexs, + &insert_results, ); - n_failed_inserts += failed_insert_indexs.items.len; + + var insert_fail_count: u64 = 0; + for (insert_results.items) |result| { + switch (result) { + .InsertedNewEntry => self.stats.push_message_n_new_inserts.inc(), + .OverwroteExistingEntry => |old_data| { + self.stats.push_message_n_overwrite_existing.inc(); + // if the value was overwritten, we need to free the old value + bincode.free(self.gossip_value_allocator, old_data); + }, + .IgnoredOldValue => self.stats.push_message_n_old_value.inc(), + .IgnoredDuplicateValue => self.stats.push_message_n_duplicate_value.inc(), + .IgnoredTimeout => self.stats.push_message_n_timeouts.inc(), + .GossipTableFull => {}, + } + if (!result.wasInserted()) { + insert_fail_count += 1; + } + } + self.stats.push_message_n_invalid_shred_version.add(invalid_shred_count); // logging this message takes too long and causes a bottleneck // self.logger @@ -1580,19 +1602,21 @@ pub const GossipService = struct { // .field("n_failed_inserts", failed_insert_indexs.items.len) // .debug("gossip: recv push_message"); - if (failed_insert_indexs.items.len == 0) { + if (insert_fail_count == 0) { // dont need to build prune messages continue; } - // Free failed inserts + // free failed inserts defer { - for (failed_insert_indexs.items) |failed_index| { - bincode.free(self.gossip_value_allocator, push_message.gossip_values[failed_index]); + for (insert_results.items, 0..) |result, index| { + if (!result.wasInserted()) { + bincode.free(self.gossip_value_allocator, push_message.gossip_values[index]); + } } } - // lookup contact info + // lookup contact info to send a prune message to const from_contact_info = gossip_table.getThreadSafeContactInfo(push_message.from_pubkey.*) orelse { // unable to find contact info continue; @@ -1615,15 +1639,14 @@ pub const GossipService = struct { } break :blk lookup_result.value_ptr; }; - for (failed_insert_indexs.items) |failed_index| { - const origin = push_message.gossip_values[failed_index].id(); - try failed_origins.put(origin, {}); + + for (insert_results.items, 0..) |result, index| { + if (!result.wasInserted()) { + const origin = push_message.gossip_values[index].id(); + try failed_origins.put(origin, {}); + } } } - - self.stats.push_message_n_values.add(n_gossip_data); - self.stats.push_message_n_invalid_values.add(n_failed_inserts); - self.stats.push_message_n_invalid_values.add(n_invalid_data); } // build prune packets @@ -1697,11 +1720,10 @@ pub const GossipService = struct { // TODO: condition timeout on stake weight: // - values from nodes with non-zero stake: epoch duration // - values from nodes with zero stake: - // - if all nodes have zero stake: epoch duration - // - if any other nodes have non-zero stake: GOSSIP_PULL_TIMEOUT_MS (15s) - const n_values_removed = try gossip_table.removeOldLabels(now, DEFAULT_EPOCH_DURATION); + // - if all nodes have zero stake: epoch duration (TODO: this might be unreasonably large) + // - if any other nodes have non-zero stake: DATA_TIMEOUT (15s) + const n_values_removed = try gossip_table.removeOldLabels(now, DEFAULT_EPOCH_DURATION.asMillis()); self.stats.table_old_values_removed.add(n_values_removed); - self.stats.table_remove_old_values_call_count.inc(); } const failed_insert_cutoff_timestamp = now -| FAILED_INSERTS_RETENTION.asMillis(); @@ -1876,7 +1898,9 @@ pub const GossipService = struct { while (i < gossip_values_array.items.len) { const gossip_value = &gossip_values[i]; switch (gossip_value.data) { - // always allow contact info + node instance to update shred versions + // always allow contact info + node instance to update shred versions. + // this also allows us to know who *not* to send pull requests to, if the shred version + // doesnt match ours .ContactInfo => {}, .LegacyContactInfo => {}, .NodeInstance => {}, @@ -1937,9 +1961,21 @@ pub const GossipStats = struct { pull_responses_sent: *Counter, prune_messages_sent: *Counter, - push_message_n_values: *Counter, - push_message_n_failed_inserts: *Counter, - push_message_n_invalid_values: *Counter, + // inserting push messages stats + push_message_n_invalid_shred_version: *Counter, + push_message_n_new_inserts: *Counter, + push_message_n_overwrite_existing: *Counter, + push_message_n_old_value: *Counter, + push_message_n_duplicate_value: *Counter, + push_message_n_timeouts: *Counter, + + // inserting pull response stats + pull_response_n_invalid_shred_version: *Counter, + pull_response_n_new_inserts: *Counter, + pull_response_n_overwrite_existing: *Counter, + pull_response_n_old_value: *Counter, + pull_response_n_duplicate_value: *Counter, + pull_response_n_timeouts: *Counter, handle_batch_ping_time: *Histogram, handle_batch_pong_time: *Histogram, @@ -1960,8 +1996,6 @@ pub const GossipStats = struct { table_n_pubkeys: *GaugeU64, table_pubkeys_dropped: *Counter, table_old_values_removed: *Counter, - table_trim_call_count: *Counter, - table_remove_old_values_call_count: *Counter, // logging details _logging_fields: struct { @@ -2789,6 +2823,7 @@ test "build pull requests" { var rando_keypair = try KeyPair.create(null); var value = try SignedGossipData.randomWithIndex(prng.random(), &rando_keypair, 0); value.wallclockPtr().* = now + 10 * i; + value.data.LegacyContactInfo.shred_version = contact_info.shred_version; _ = try lg.mut().insert(value, now + 10 * i); pc._setPong(value.data.LegacyContactInfo.id, value.data.LegacyContactInfo.gossip); diff --git a/src/gossip/table.zig b/src/gossip/table.zig index 7368568ca..e045f8d68 100644 --- a/src/gossip/table.zig +++ b/src/gossip/table.zig @@ -16,15 +16,14 @@ const LegacyContactInfo = sig.gossip.data.LegacyContactInfo; const ContactInfo = sig.gossip.data.ContactInfo; const ThreadSafeContactInfo = sig.gossip.data.ThreadSafeContactInfo; const ThreadPool = sig.sync.ThreadPool; -const Task = sig.sync.ThreadPool.Task; -const Batch = sig.sync.ThreadPool.Batch; const Hash = sig.core.hash.Hash; const Pubkey = sig.core.Pubkey; const SocketAddr = sig.net.SocketAddr; const PACKET_DATA_SIZE = sig.net.packet.PACKET_DATA_SIZE; pub const UNIQUE_PUBKEY_CAPACITY: usize = 8_192; -pub const MAX_TABLE_SIZE: usize = 100_000; // TODO: better value for this +// TODO: cli arg for this +pub const MAX_TABLE_SIZE: usize = 1_000_000; // TODO: better value for this pub const HashAndTime = struct { hash: Hash, timestamp: u64 }; @@ -33,24 +32,6 @@ pub fn AutoArrayHashSet(comptime T: type) type { return AutoArrayHashMap(T, void); } -pub const InsertResults = struct { - inserted: ?std.ArrayList(usize), - timeouts: ?std.ArrayList(usize), - failed: ?std.ArrayList(usize), - - pub fn deinit(self: InsertResults) void { - if (self.inserted) |inserted| { - inserted.deinit(); - } - if (self.timeouts) |timeouts| { - timeouts.deinit(); - } - if (self.failed) |failed| { - failed.deinit(); - } - } -}; - /// Cluster Replicated Data Store: stores gossip data /// the self.store uses an AutoArrayHashMap which is a HashMap that also allows for /// indexing values (value = arrayhashmap[0]). This allows us to insert data @@ -100,11 +81,6 @@ pub const GossipTable = struct { const Self = @This(); - pub const InsertionError = error{ - OldValue, - DuplicateValue, - }; - pub fn init(allocator: std.mem.Allocator, thread_pool: *ThreadPool) !Self { return Self{ .store = AutoArrayHashMap(GossipKey, GossipVersionedData).init(allocator), @@ -152,11 +128,21 @@ pub const GossipTable = struct { self.store.deinit(); } - pub fn insert(self: *Self, value: SignedGossipData, now: u64) !bool { - if (self.store.count() >= MAX_TABLE_SIZE) { - return error.GossipTableFull; + pub const InsertResult = union(enum) { + // possible values returned from insert() + InsertedNewEntry: void, + OverwroteExistingEntry: GossipData, // the overwritten value + IgnoredOldValue: void, + IgnoredDuplicateValue: void, + IgnoredTimeout: void, + GossipTableFull: void, + + pub fn wasInserted(self: InsertResult) bool { + return self == .InsertedNewEntry or self == .OverwroteExistingEntry; } + }; + pub fn insert(self: *Self, value: SignedGossipData, now: u64) !InsertResult { var buf: [PACKET_DATA_SIZE]u8 = undefined; const bytes = try bincode.writeToSlice(&buf, value, bincode.Params.standard); const value_hash = Hash.generateSha256Hash(bytes); @@ -174,6 +160,12 @@ pub const GossipTable = struct { // entry doesnt exist if (!result.found_existing) { + // if table is full, return early + if (self.store.count() >= MAX_TABLE_SIZE) { + _ = self.store.swapRemove(label); + return .GossipTableFull; + } + switch (value.data) { .ContactInfo => |*info| { try self.contact_infos.put(entry_index, {}); @@ -214,7 +206,9 @@ pub const GossipTable = struct { self.cursor += 1; - return true; + // inserted new entry + return .InsertedNewEntry; + // should overwrite existing entry } else if (versioned_value.overwrites(result.value_ptr)) { const old_entry = result.value_ptr.*; @@ -263,9 +257,10 @@ pub const GossipTable = struct { try self.purged.insert(old_entry.value_hash, now); result.value_ptr.* = versioned_value; - self.cursor += 1; - return true; + + // overwrite existing entry + return .{ .OverwroteExistingEntry = old_entry.value.data }; // do nothing } else { @@ -274,14 +269,27 @@ pub const GossipTable = struct { if (old_entry.value_hash.order(&versioned_value.value_hash) != .eq) { // if hash isnt the same and override() is false then msg is old try self.purged.insert(old_entry.value_hash, now); - return InsertionError.OldValue; + return .IgnoredOldValue; } else { - // hash is the same then its a duplicate - return InsertionError.DuplicateValue; + // hash is the same then its a duplicate value which isnt stored + return .IgnoredDuplicateValue; } + } + } - return false; + pub fn insertWithTimeout( + self: *Self, + value: SignedGossipData, + now: u64, + timeout: u64, + ) !InsertResult { + const value_time = value.wallclock(); + const is_too_new = value_time > now +| timeout; + const is_too_old = value_time < now -| timeout; + if (is_too_new or is_too_old) { + return .IgnoredTimeout; } + return self.insert(value, now); } pub fn insertValues( @@ -289,72 +297,26 @@ pub const GossipTable = struct { now: u64, values: []SignedGossipData, timeout: u64, - comptime record_inserts: bool, - comptime record_timeouts: bool, - ) error{OutOfMemory}!InsertResults { - // TODO: change to record duplicate and old values seperately + handle when - // gossip table is full - var failed_indexs = std.ArrayList(usize).init(self.allocator); - var inserted_indexs = std.ArrayList(usize).init(self.allocator); - var timeout_indexs = std.ArrayList(usize).init(self.allocator); - - for (values, 0..) |value, index| { - const value_time = value.wallclock(); - const is_too_new = value_time > now +| timeout; - const is_too_old = value_time < now -| timeout; - if (is_too_new or is_too_old) { - if (record_timeouts) { - try timeout_indexs.append(index); - } - continue; - } - - const was_inserted = self.insert(value, now) catch false; - if (was_inserted) { - try inserted_indexs.append(index); - } else { - try failed_indexs.append(index); - } - } - - return InsertResults{ - .inserted = if (record_inserts) inserted_indexs else null, - .timeouts = if (record_timeouts) timeout_indexs else null, - .failed = failed_indexs, - }; + ) !std.ArrayList(InsertResult) { + var results = std.ArrayList(InsertResult).init(self.allocator); + try self.insertValuesWithResults(now, values, timeout, &results); + return results; } - /// Like insertValues, but it minimizes the number of memory allocations. - /// - /// This is optimized to minimize the number of times that allocations occur. - /// It is *not* optimized to minimize overall memory usage. - /// - /// It accepts an arraylist of failures instead of returning an InsertResults, so it - /// can reuse the arraylist from a previous execution rather than allocating a new one. - /// - /// For simplicity and performance, only tracks failures without `inserted` and `timeouts`, - pub fn insertValuesMinAllocs( + /// Like insertValues, but it accepts an arraylist whose memory can be reused. + pub fn insertValuesWithResults( self: *Self, now: u64, values: []SignedGossipData, timeout: u64, - failed_indexes: *std.ArrayList(usize), - ) error{OutOfMemory}!void { - failed_indexes.clearRetainingCapacity(); - try failed_indexes.ensureTotalCapacity(values.len); - - for (values, 0..) |value, index| { - const value_time = value.wallclock(); - const is_too_new = value_time > now +| timeout; - const is_too_old = value_time < now -| timeout; - if (is_too_new or is_too_old) { - continue; - } - - const did_insert = self.insert(value, now) catch false; - if (!did_insert) { - failed_indexes.appendAssumeCapacity(index); - } + results: *std.ArrayList(InsertResult), + ) !void { + results.clearRetainingCapacity(); + try results.ensureTotalCapacity(values.len); + + for (values) |value| { + const result = try self.insertWithTimeout(value, now, timeout); + results.appendAssumeCapacity(result); } } @@ -759,27 +721,18 @@ pub const GossipTable = struct { return old_labels.items.len; } - const GetOldLabelsTask = struct { - // context - key: Pubkey, - table: *const GossipTable, - cutoff_timestamp: u64, - old_labels: std.ArrayList(GossipKey), - - // standard - task: Task = .{ .callback = callback }, - done: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), - - pub fn deinit(self: *GetOldLabelsTask) void { - self.old_labels.deinit(); - } - - pub fn callback(task: *Task) void { - const self: *@This() = @fieldParentPtr("task", task); - defer self.done.store(true, .release); + pub fn getOldLabels( + self: *Self, + now: u64, + timeout: u64, + ) error{OutOfMemory}!std.ArrayList(GossipKey) { + const cutoff_timestamp = now -| timeout; + const n_pubkeys = self.pubkey_to_values.count(); - // get assocaited entries - const entry = self.table.pubkey_to_values.getEntry(self.key).?; + var old_labels = std.ArrayList(GossipKey).init(self.allocator); + next_key: for (self.pubkey_to_values.keys()[0..n_pubkeys]) |key| { + // get associated entries + const entry = self.pubkey_to_values.getEntry(key).?; // if contact info is up to date then we dont need to check the values const pubkey = entry.key_ptr; @@ -788,13 +741,13 @@ pub const GossipTable = struct { GossipKey{ .ContactInfo = pubkey.* }, }; inline for (labels) |label| { - if (self.table.get(label)) |*contact_info| { + if (self.get(label)) |*contact_info| { const value_timestamp = @min( contact_info.value.wallclock(), contact_info.timestamp_on_insertion, ); - if (value_timestamp > self.cutoff_timestamp) { - return; + if (value_timestamp > cutoff_timestamp) { + continue :next_key; } } } @@ -804,62 +757,18 @@ pub const GossipTable = struct { const count = entry_indexs.count(); for (entry_indexs.iterator().keys[0..count]) |entry_index| { - const versioned_value = self.table.store.values()[entry_index]; + const versioned_value = self.store.values()[entry_index]; const value_timestamp = @min( versioned_value.value.wallclock(), versioned_value.timestamp_on_insertion, ); - if (value_timestamp <= self.cutoff_timestamp) { - self.old_labels.append(versioned_value.value.label()) catch unreachable; + if (value_timestamp <= cutoff_timestamp) { + old_labels.append(versioned_value.value.label()) catch unreachable; } } } - }; - - pub fn getOldLabels( - self: *Self, - now: u64, - timeout: u64, - ) error{OutOfMemory}!std.ArrayList(GossipKey) { - const cutoff_timestamp = now -| timeout; - const n_pubkeys = self.pubkey_to_values.count(); - - var tasks = try self.allocator.alloc(GetOldLabelsTask, n_pubkeys); - defer { - for (tasks) |*task| task.deinit(); - self.allocator.free(tasks); - } - - // run this loop in parallel - for (self.pubkey_to_values.keys()[0..n_pubkeys], 0..) |key, i| { - tasks[i] = GetOldLabelsTask{ - .key = key, - .table = self, - .cutoff_timestamp = cutoff_timestamp, - .old_labels = std.ArrayList(GossipKey).init(self.allocator), - }; - - // run it - const batch = Batch.from(&tasks[i].task); - self.thread_pool.schedule(batch); - } - - // wait for them to be done to release the lock - var output_length: u64 = 0; - for (tasks) |*task| { - while (!task.done.load(.acquire)) { - // wait - } - output_length += task.old_labels.items.len; - } - - // move labels to one big array - var output = try std.ArrayList(GossipKey).initCapacity(self.allocator, output_length); - for (tasks) |*task| { - output.appendSliceAssumeCapacity(task.old_labels.items); - } - return output; + return old_labels; } pub fn getOwnedContactInfoByGossipAddr( @@ -1128,7 +1037,8 @@ test "insert and get contact_info" { defer table.deinit(); // test insertion - _ = try table.insert(gossip_value, 0); + const result = try table.insert(gossip_value, 0); + try std.testing.expectEqual(.InsertedNewEntry, result); // test retrieval var buf: [100]ContactInfo = undefined; @@ -1137,13 +1047,14 @@ test "insert and get contact_info" { try std.testing.expect(nodes[0].pubkey.equals(&id)); // test re-insertion - const result = table.insert(gossip_value, 0); - try std.testing.expectError(GossipTable.InsertionError.DuplicateValue, result); + const result2 = try table.insert(gossip_value, 0); + try std.testing.expectEqual(.IgnoredDuplicateValue, result2); // test re-insertion with greater wallclock gossip_value.data.LegacyContactInfo.wallclock += 2; const v = gossip_value.data.LegacyContactInfo.wallclock; - _ = try table.insert(gossip_value, 0); + const result3 = try table.insert(gossip_value, 0); + try std.testing.expectEqual(.OverwroteExistingEntry, std.meta.activeTag(result3)); // check retrieval nodes = table.getContactInfos(&buf, 0);