Skip to content

Commit

Permalink
TagCounter gains a tag set to filter on (#4503)
Browse files Browse the repository at this point in the history
unknown tags can be counted under one substitute tag e.g. "UNK"
limit protocol counters to known protocol tags
  • Loading branch information
brianolson authored Sep 8, 2022
1 parent d3ccb11 commit 3c64c26
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 4 deletions.
17 changes: 13 additions & 4 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,24 @@ const averageMessageLength = 2 * 1024 // Most of the messages are smaller tha
// buffer and starve messages from other peers.
const msgsInReadBufferPerPeer = 10

var tagStringList []string

func init() {
tagStringList = make([]string, len(protocol.TagList))
for i, t := range protocol.TagList {
tagStringList[i] = string(t)
}
}

var networkSentBytesTotal = metrics.MakeCounter(metrics.NetworkSentBytesTotal)
var networkSentBytesByTag = metrics.NewTagCounter("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages")
var networkSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK")
var networkReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkReceivedBytesTotal)
var networkReceivedBytesByTag = metrics.NewTagCounter("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages")
var networkReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK")

var networkMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkMessageReceivedTotal)
var networkMessageReceivedByTag = metrics.NewTagCounter("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages")
var networkMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK")
var networkMessageSentTotal = metrics.MakeCounter(metrics.NetworkMessageSentTotal)
var networkMessageSentByTag = metrics.NewTagCounter("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages")
var networkMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK")

var networkConnectionsDroppedTotal = metrics.MakeCounter(metrics.NetworkConnectionsDroppedTotal)
var networkMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_message_sent_queue_micros_total", Description: "Total microseconds message spent waiting in queue to be sent"})
Expand Down
19 changes: 19 additions & 0 deletions protocol/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,22 @@ const (
//UniCatchupResTag Tag = "UT" was used for wsfetcherservice
VoteBundleTag Tag = "VB"
)

// TagList is a list of all currently used protocol tags.
// TODO: generate this and/or have a test that it is complete.
var TagList = []Tag{
UnknownMsgTag,
AgreementVoteTag,
MsgOfInterestTag,
MsgDigestSkipTag,
NetPrioResponseTag,
PingTag,
PingReplyTag,
ProposalPayloadTag,
StateProofSigTag,
TopicMsgRespTag,
TxnTag,
UniCatchupReqTag,
UniEnsBlockReqTag,
VoteBundleTag,
}
27 changes: 27 additions & 0 deletions util/metrics/tagcounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@ import (
"github.com/algorand/go-deadlock"
)

// NewTagCounterFiltered makes a set of metrics under rootName for tagged counting.
// "{TAG}" in rootName is replaced by the tag, otherwise "_{TAG}" is appended.
// Tags not in allowedTags will be filtered out and ignored.
// unknownTag may be "" or a value that will be counted for tags not in allowedTags.
func NewTagCounterFiltered(rootName, desc string, allowedTags []string, unknownTag string) *TagCounter {
tc := &TagCounter{Name: rootName, Description: desc, UnknownTag: unknownTag}
if len(allowedTags) != 0 {
tc.AllowedTags = make(map[string]bool, len(allowedTags))
for _, tag := range allowedTags {
tc.AllowedTags[tag] = true
}
}
DefaultRegistry().Register(tc)
return tc
}

// NewTagCounter makes a set of metrics under rootName for tagged counting.
// "{TAG}" in rootName is replaced by the tag, otherwise "_{TAG}" is appended.
// Optionally provided declaredTags counters for these names up front (making them easier to discover).
Expand All @@ -41,6 +57,10 @@ type TagCounter struct {
Name string
Description string

AllowedTags map[string]bool

UnknownTag string

// a read only race-free reference to tags
tagptr atomic.Value

Expand All @@ -54,6 +74,13 @@ type TagCounter struct {

// Add t[tag] += val, fast and multithread safe
func (tc *TagCounter) Add(tag string, val uint64) {
if (tc.AllowedTags != nil) && (!tc.AllowedTags[tag]) {
if len(tc.UnknownTag) != 0 {
tag = tc.UnknownTag
} else {
return
}
}
for {
var tags map[string]*uint64
tagptr := tc.tagptr.Load()
Expand Down
78 changes: 78 additions & 0 deletions util/metrics/tagcounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,84 @@ func TestTagCounter(t *testing.T) {
}
}

func TestTagCounterFilter(t *testing.T) {
partitiontest.PartitionTest(t)

tags := make([]string, 17)
for i := range tags {
tags[i] = fmt.Sprintf("A%c", 'A'+i)
}
goodTags := tags[:10]
badTags := tags[10:]
badCount := uint64(0)
//t.Logf("tags %v", tags)
countsIn := make([]uint64, len(tags))
for i := range countsIn {
countsIn[i] = uint64(10 * (i + 1))
if i >= 10 {
badCount += countsIn[i]
}
}

tc := NewTagCounterFiltered("tc", "wat", goodTags, "UNK")
DefaultRegistry().Deregister(tc)

// check that empty TagCounter cleanly returns no results
var sb strings.Builder
tc.WriteMetric(&sb, "")
require.Equal(t, "", sb.String())

result := make(map[string]float64)
tc.AddMetric(result)
require.Equal(t, 0, len(result))

var wg sync.WaitGroup
wg.Add(len(tags))

runf := func(tag string, count uint64) {
for i := 0; i < int(count); i++ {
tc.Add(tag, 1)
}
wg.Done()
}

for i, tag := range tags {
go runf(tag, countsIn[i])
}
wg.Wait()

endtags := tc.tagptr.Load().(map[string]*uint64)
for i, tag := range goodTags {
countin := countsIn[i]
endcountp := endtags[tag]
if endcountp == nil {
t.Errorf("tag[%d] %s nil counter", i, tag)
continue
}
endcount := *endcountp
if endcount != countin {
t.Errorf("tag[%d] %v wanted %d got %d", i, tag, countin, endcount)
}
}
for _, tag := range badTags {
endcountp := endtags[tag]
if endcountp == nil {
// best, nil entry, never touched the struct
continue
}
endcount := *endcountp
if endcount != 0 {
t.Errorf("bad tag %v wanted %d got %d", tag, 0, endcount)
}
}
endcountp := endtags["UNK"]
endcount := uint64(0)
if endcountp != nil {
endcount = *endcountp
}
require.Equal(t, badCount, endcount)
}

func TestTagCounterWriteMetric(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down

0 comments on commit 3c64c26

Please sign in to comment.