From 96aeba26f2d1e22e988bb3dca9b78de10e5c9e88 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Wed, 31 Aug 2022 11:55:30 -0400 Subject: [PATCH] TagCounter gains a tag set to filter on unknown tags can be counted under one substitute tag e.g. "UNK" limit protocol counters to known protocol tags --- network/wsPeer.go | 17 +++++-- protocol/tags.go | 19 ++++++++ util/metrics/tagcounter.go | 27 ++++++++++++ util/metrics/tagcounter_test.go | 78 +++++++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 4 deletions(-) diff --git a/network/wsPeer.go b/network/wsPeer.go index 594accb3db..6ae9bc0d28 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -47,15 +47,24 @@ const averageMessageLength = 2 * 1024 // Most of the messages are smaller tha // buffer and starve messages from other peers. const msgsInReadBufferPerPeer = 10 +var tagStringList []string + +func init() { + tagStringList = make([]string, len(protocol.TagList)) + for i, t := range protocol.TagList { + tagStringList[i] = string(t) + } +} + var networkSentBytesTotal = metrics.MakeCounter(metrics.NetworkSentBytesTotal) -var networkSentBytesByTag = metrics.NewTagCounter("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages") +var networkSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK") var networkReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkReceivedBytesTotal) -var networkReceivedBytesByTag = metrics.NewTagCounter("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages") +var networkReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK") var networkMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkMessageReceivedTotal) -var networkMessageReceivedByTag = metrics.NewTagCounter("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages") +var networkMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK") var networkMessageSentTotal = metrics.MakeCounter(metrics.NetworkMessageSentTotal) -var networkMessageSentByTag = metrics.NewTagCounter("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages") +var networkMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK") var networkConnectionsDroppedTotal = metrics.MakeCounter(metrics.NetworkConnectionsDroppedTotal) var networkMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_message_sent_queue_micros_total", Description: "Total microseconds message spent waiting in queue to be sent"}) diff --git a/protocol/tags.go b/protocol/tags.go index f3bff635b5..01aee9126b 100644 --- a/protocol/tags.go +++ b/protocol/tags.go @@ -42,3 +42,22 @@ const ( //UniCatchupResTag Tag = "UT" was used for wsfetcherservice VoteBundleTag Tag = "VB" ) + +// TagList is a list of all currently used protocol tags. +// TODO: generate this and/or have a test that it is complete. +var TagList = []Tag{ + UnknownMsgTag, + AgreementVoteTag, + MsgOfInterestTag, + MsgDigestSkipTag, + NetPrioResponseTag, + PingTag, + PingReplyTag, + ProposalPayloadTag, + StateProofSigTag, + TopicMsgRespTag, + TxnTag, + UniCatchupReqTag, + UniEnsBlockReqTag, + VoteBundleTag, +} diff --git a/util/metrics/tagcounter.go b/util/metrics/tagcounter.go index d110b8d4f0..d4de0d53cc 100644 --- a/util/metrics/tagcounter.go +++ b/util/metrics/tagcounter.go @@ -24,6 +24,22 @@ import ( "github.com/algorand/go-deadlock" ) +// NewTagCounterFiltered makes a set of metrics under rootName for tagged counting. +// "{TAG}" in rootName is replaced by the tag, otherwise "_{TAG}" is appended. +// Tags not in allowedTags will be filtered out and ignored. +// unknownTag may be "" or a value that will be counted for tags not in allowedTags. +func NewTagCounterFiltered(rootName, desc string, allowedTags []string, unknownTag string) *TagCounter { + tc := &TagCounter{Name: rootName, Description: desc, UnknownTag: unknownTag} + if len(allowedTags) != 0 { + tc.AllowedTags = make(map[string]bool, len(allowedTags)) + for _, tag := range allowedTags { + tc.AllowedTags[tag] = true + } + } + DefaultRegistry().Register(tc) + return tc +} + // NewTagCounter makes a set of metrics under rootName for tagged counting. // "{TAG}" in rootName is replaced by the tag, otherwise "_{TAG}" is appended. // Optionally provided declaredTags counters for these names up front (making them easier to discover). @@ -41,6 +57,10 @@ type TagCounter struct { Name string Description string + AllowedTags map[string]bool + + UnknownTag string + // a read only race-free reference to tags tagptr atomic.Value @@ -54,6 +74,13 @@ type TagCounter struct { // Add t[tag] += val, fast and multithread safe func (tc *TagCounter) Add(tag string, val uint64) { + if (tc.AllowedTags != nil) && (!tc.AllowedTags[tag]) { + if len(tc.UnknownTag) != 0 { + tag = tc.UnknownTag + } else { + return + } + } for { var tags map[string]*uint64 tagptr := tc.tagptr.Load() diff --git a/util/metrics/tagcounter_test.go b/util/metrics/tagcounter_test.go index feb464a35c..9e8a507176 100644 --- a/util/metrics/tagcounter_test.go +++ b/util/metrics/tagcounter_test.go @@ -81,6 +81,84 @@ func TestTagCounter(t *testing.T) { } } +func TestTagCounterFilter(t *testing.T) { + partitiontest.PartitionTest(t) + + tags := make([]string, 17) + for i := range tags { + tags[i] = fmt.Sprintf("A%c", 'A'+i) + } + goodTags := tags[:10] + badTags := tags[10:] + badCount := uint64(0) + //t.Logf("tags %v", tags) + countsIn := make([]uint64, len(tags)) + for i := range countsIn { + countsIn[i] = uint64(10 * (i + 1)) + if i >= 10 { + badCount += countsIn[i] + } + } + + tc := NewTagCounterFiltered("tc", "wat", goodTags, "UNK") + DefaultRegistry().Deregister(tc) + + // check that empty TagCounter cleanly returns no results + var sb strings.Builder + tc.WriteMetric(&sb, "") + require.Equal(t, "", sb.String()) + + result := make(map[string]float64) + tc.AddMetric(result) + require.Equal(t, 0, len(result)) + + var wg sync.WaitGroup + wg.Add(len(tags)) + + runf := func(tag string, count uint64) { + for i := 0; i < int(count); i++ { + tc.Add(tag, 1) + } + wg.Done() + } + + for i, tag := range tags { + go runf(tag, countsIn[i]) + } + wg.Wait() + + endtags := tc.tagptr.Load().(map[string]*uint64) + for i, tag := range goodTags { + countin := countsIn[i] + endcountp := endtags[tag] + if endcountp == nil { + t.Errorf("tag[%d] %s nil counter", i, tag) + continue + } + endcount := *endcountp + if endcount != countin { + t.Errorf("tag[%d] %v wanted %d got %d", i, tag, countin, endcount) + } + } + for _, tag := range badTags { + endcountp := endtags[tag] + if endcountp == nil { + // best, nil entry, never touched the struct + continue + } + endcount := *endcountp + if endcount != 0 { + t.Errorf("bad tag %v wanted %d got %d", tag, 0, endcount) + } + } + endcountp := endtags["UNK"] + endcount := uint64(0) + if endcountp != nil { + endcount = *endcountp + } + require.Equal(t, badCount, endcount) +} + func TestTagCounterWriteMetric(t *testing.T) { partitiontest.PartitionTest(t)