Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: TagCounter gains a tag set to filter on #4503

Merged
merged 1 commit into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,24 @@ const averageMessageLength = 2 * 1024 // Most of the messages are smaller tha
// buffer and starve messages from other peers.
const msgsInReadBufferPerPeer = 10

var tagStringList []string

func init() {
tagStringList = make([]string, len(protocol.TagList))
for i, t := range protocol.TagList {
tagStringList[i] = string(t)
}
}

var networkSentBytesTotal = metrics.MakeCounter(metrics.NetworkSentBytesTotal)
var networkSentBytesByTag = metrics.NewTagCounter("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages")
var networkSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK")
var networkReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkReceivedBytesTotal)
var networkReceivedBytesByTag = metrics.NewTagCounter("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages")
var networkReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK")

var networkMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkMessageReceivedTotal)
var networkMessageReceivedByTag = metrics.NewTagCounter("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages")
var networkMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK")
var networkMessageSentTotal = metrics.MakeCounter(metrics.NetworkMessageSentTotal)
var networkMessageSentByTag = metrics.NewTagCounter("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages")
var networkMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK")

var networkConnectionsDroppedTotal = metrics.MakeCounter(metrics.NetworkConnectionsDroppedTotal)
var networkMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_message_sent_queue_micros_total", Description: "Total microseconds message spent waiting in queue to be sent"})
Expand Down
19 changes: 19 additions & 0 deletions protocol/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,22 @@ const (
//UniCatchupResTag Tag = "UT" was used for wsfetcherservice
VoteBundleTag Tag = "VB"
)

// TagList is a list of all currently used protocol tags.
// TODO: generate this and/or have a test that it is complete.
var TagList = []Tag{
UnknownMsgTag,
AgreementVoteTag,
MsgOfInterestTag,
MsgDigestSkipTag,
NetPrioResponseTag,
PingTag,
PingReplyTag,
ProposalPayloadTag,
StateProofSigTag,
TopicMsgRespTag,
TxnTag,
UniCatchupReqTag,
UniEnsBlockReqTag,
VoteBundleTag,
}
27 changes: 27 additions & 0 deletions util/metrics/tagcounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@ import (
"github.com/algorand/go-deadlock"
)

// NewTagCounterFiltered makes a set of metrics under rootName for tagged counting.
// "{TAG}" in rootName is replaced by the tag, otherwise "_{TAG}" is appended.
// Tags not in allowedTags will be filtered out and ignored.
// unknownTag may be "" or a value that will be counted for tags not in allowedTags.
func NewTagCounterFiltered(rootName, desc string, allowedTags []string, unknownTag string) *TagCounter {
tc := &TagCounter{Name: rootName, Description: desc, UnknownTag: unknownTag}
if len(allowedTags) != 0 {
tc.AllowedTags = make(map[string]bool, len(allowedTags))
for _, tag := range allowedTags {
tc.AllowedTags[tag] = true
}
}
DefaultRegistry().Register(tc)
return tc
}

// NewTagCounter makes a set of metrics under rootName for tagged counting.
// "{TAG}" in rootName is replaced by the tag, otherwise "_{TAG}" is appended.
// Optionally provided declaredTags counters for these names up front (making them easier to discover).
Expand All @@ -41,6 +57,10 @@ type TagCounter struct {
Name string
Description string

AllowedTags map[string]bool

UnknownTag string

// a read only race-free reference to tags
tagptr atomic.Value

Expand All @@ -54,6 +74,13 @@ type TagCounter struct {

// Add t[tag] += val, fast and multithread safe
func (tc *TagCounter) Add(tag string, val uint64) {
if (tc.AllowedTags != nil) && (!tc.AllowedTags[tag]) {
if len(tc.UnknownTag) != 0 {
tag = tc.UnknownTag
} else {
return
}
}
for {
var tags map[string]*uint64
tagptr := tc.tagptr.Load()
Expand Down
78 changes: 78 additions & 0 deletions util/metrics/tagcounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,84 @@ func TestTagCounter(t *testing.T) {
}
}

func TestTagCounterFilter(t *testing.T) {
partitiontest.PartitionTest(t)

tags := make([]string, 17)
for i := range tags {
tags[i] = fmt.Sprintf("A%c", 'A'+i)
}
goodTags := tags[:10]
badTags := tags[10:]
badCount := uint64(0)
//t.Logf("tags %v", tags)
countsIn := make([]uint64, len(tags))
for i := range countsIn {
countsIn[i] = uint64(10 * (i + 1))
if i >= 10 {
badCount += countsIn[i]
}
}

tc := NewTagCounterFiltered("tc", "wat", goodTags, "UNK")
DefaultRegistry().Deregister(tc)

// check that empty TagCounter cleanly returns no results
var sb strings.Builder
tc.WriteMetric(&sb, "")
require.Equal(t, "", sb.String())

result := make(map[string]float64)
tc.AddMetric(result)
require.Equal(t, 0, len(result))

var wg sync.WaitGroup
wg.Add(len(tags))

runf := func(tag string, count uint64) {
for i := 0; i < int(count); i++ {
tc.Add(tag, 1)
}
wg.Done()
}

for i, tag := range tags {
go runf(tag, countsIn[i])
}
wg.Wait()

endtags := tc.tagptr.Load().(map[string]*uint64)
for i, tag := range goodTags {
countin := countsIn[i]
endcountp := endtags[tag]
if endcountp == nil {
t.Errorf("tag[%d] %s nil counter", i, tag)
continue
}
endcount := *endcountp
if endcount != countin {
t.Errorf("tag[%d] %v wanted %d got %d", i, tag, countin, endcount)
}
}
for _, tag := range badTags {
endcountp := endtags[tag]
if endcountp == nil {
// best, nil entry, never touched the struct
continue
}
endcount := *endcountp
if endcount != 0 {
t.Errorf("bad tag %v wanted %d got %d", tag, 0, endcount)
}
}
endcountp := endtags["UNK"]
endcount := uint64(0)
if endcountp != nil {
endcount = *endcountp
}
require.Equal(t, badCount, endcount)
}

func TestTagCounterWriteMetric(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down