Skip to content

Commit

Permalink
TagCounter gains a tag set to filter on
Browse files Browse the repository at this point in the history
unknown tags can be counted under one substitute tag e.g. "UNK"
limit protocol counters to known protocol tags
  • Loading branch information
brianolson committed Aug 31, 2022
1 parent e83acec commit 96aeba2
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 4 deletions.
17 changes: 13 additions & 4 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,24 @@ const averageMessageLength = 2 * 1024 // Most of the messages are smaller tha
// buffer and starve messages from other peers.
const msgsInReadBufferPerPeer = 10

var tagStringList []string

func init() {
tagStringList = make([]string, len(protocol.TagList))
for i, t := range protocol.TagList {
tagStringList[i] = string(t)
}
}

var networkSentBytesTotal = metrics.MakeCounter(metrics.NetworkSentBytesTotal)
var networkSentBytesByTag = metrics.NewTagCounter("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages")
var networkSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK")
var networkReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkReceivedBytesTotal)
var networkReceivedBytesByTag = metrics.NewTagCounter("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages")
var networkReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK")

var networkMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkMessageReceivedTotal)
var networkMessageReceivedByTag = metrics.NewTagCounter("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages")
var networkMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK")
var networkMessageSentTotal = metrics.MakeCounter(metrics.NetworkMessageSentTotal)
var networkMessageSentByTag = metrics.NewTagCounter("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages")
var networkMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK")

var networkConnectionsDroppedTotal = metrics.MakeCounter(metrics.NetworkConnectionsDroppedTotal)
var networkMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_message_sent_queue_micros_total", Description: "Total microseconds message spent waiting in queue to be sent"})
Expand Down
19 changes: 19 additions & 0 deletions protocol/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,22 @@ const (
//UniCatchupResTag Tag = "UT" was used for wsfetcherservice
VoteBundleTag Tag = "VB"
)

// TagList is a list of all currently used protocol tags.
// TODO: generate this and/or have a test that it is complete.
var TagList = []Tag{
UnknownMsgTag,
AgreementVoteTag,
MsgOfInterestTag,
MsgDigestSkipTag,
NetPrioResponseTag,
PingTag,
PingReplyTag,
ProposalPayloadTag,
StateProofSigTag,
TopicMsgRespTag,
TxnTag,
UniCatchupReqTag,
UniEnsBlockReqTag,
VoteBundleTag,
}
27 changes: 27 additions & 0 deletions util/metrics/tagcounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@ import (
"github.com/algorand/go-deadlock"
)

// NewTagCounterFiltered makes a set of metrics under rootName for tagged counting.
// "{TAG}" in rootName is replaced by the tag, otherwise "_{TAG}" is appended.
// Tags not in allowedTags will be filtered out and ignored.
// unknownTag may be "" or a value that will be counted for tags not in allowedTags.
func NewTagCounterFiltered(rootName, desc string, allowedTags []string, unknownTag string) *TagCounter {
tc := &TagCounter{Name: rootName, Description: desc, UnknownTag: unknownTag}
if len(allowedTags) != 0 {
tc.AllowedTags = make(map[string]bool, len(allowedTags))
for _, tag := range allowedTags {
tc.AllowedTags[tag] = true
}
}
DefaultRegistry().Register(tc)
return tc
}

// NewTagCounter makes a set of metrics under rootName for tagged counting.
// "{TAG}" in rootName is replaced by the tag, otherwise "_{TAG}" is appended.
// Optionally provided declaredTags counters for these names up front (making them easier to discover).
Expand All @@ -41,6 +57,10 @@ type TagCounter struct {
Name string
Description string

AllowedTags map[string]bool

UnknownTag string

// a read only race-free reference to tags
tagptr atomic.Value

Expand All @@ -54,6 +74,13 @@ type TagCounter struct {

// Add t[tag] += val, fast and multithread safe
func (tc *TagCounter) Add(tag string, val uint64) {
if (tc.AllowedTags != nil) && (!tc.AllowedTags[tag]) {
if len(tc.UnknownTag) != 0 {
tag = tc.UnknownTag
} else {
return
}
}
for {
var tags map[string]*uint64
tagptr := tc.tagptr.Load()
Expand Down
78 changes: 78 additions & 0 deletions util/metrics/tagcounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,84 @@ func TestTagCounter(t *testing.T) {
}
}

func TestTagCounterFilter(t *testing.T) {
partitiontest.PartitionTest(t)

tags := make([]string, 17)
for i := range tags {
tags[i] = fmt.Sprintf("A%c", 'A'+i)
}
goodTags := tags[:10]
badTags := tags[10:]
badCount := uint64(0)
//t.Logf("tags %v", tags)
countsIn := make([]uint64, len(tags))
for i := range countsIn {
countsIn[i] = uint64(10 * (i + 1))
if i >= 10 {
badCount += countsIn[i]
}
}

tc := NewTagCounterFiltered("tc", "wat", goodTags, "UNK")
DefaultRegistry().Deregister(tc)

// check that empty TagCounter cleanly returns no results
var sb strings.Builder
tc.WriteMetric(&sb, "")
require.Equal(t, "", sb.String())

result := make(map[string]float64)
tc.AddMetric(result)
require.Equal(t, 0, len(result))

var wg sync.WaitGroup
wg.Add(len(tags))

runf := func(tag string, count uint64) {
for i := 0; i < int(count); i++ {
tc.Add(tag, 1)
}
wg.Done()
}

for i, tag := range tags {
go runf(tag, countsIn[i])
}
wg.Wait()

endtags := tc.tagptr.Load().(map[string]*uint64)
for i, tag := range goodTags {
countin := countsIn[i]
endcountp := endtags[tag]
if endcountp == nil {
t.Errorf("tag[%d] %s nil counter", i, tag)
continue
}
endcount := *endcountp
if endcount != countin {
t.Errorf("tag[%d] %v wanted %d got %d", i, tag, countin, endcount)
}
}
for _, tag := range badTags {
endcountp := endtags[tag]
if endcountp == nil {
// best, nil entry, never touched the struct
continue
}
endcount := *endcountp
if endcount != 0 {
t.Errorf("bad tag %v wanted %d got %d", tag, 0, endcount)
}
}
endcountp := endtags["UNK"]
endcount := uint64(0)
if endcountp != nil {
endcount = *endcountp
}
require.Equal(t, badCount, endcount)
}

func TestTagCounterWriteMetric(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down

0 comments on commit 96aeba2

Please sign in to comment.