Skip to content

Commit

Permalink
use SetGoroutineLabels
Browse files Browse the repository at this point in the history
  • Loading branch information
cce committed Sep 14, 2022
1 parent fee4854 commit f458336
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 42 deletions.
2 changes: 2 additions & 0 deletions agreement/demux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/logspec"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util"
)

const (
Expand Down Expand Up @@ -113,6 +114,7 @@ func (d *demux) tokenizeMessages(ctx context.Context, net Network, tag protocol.
defer func() {
close(decoded)
}()
util.SetGoroutineLabels("func", "demux.tokenizeMessages", "tag", string(tag))
for {
select {
case raw, ok := <-networkMessages:
Expand Down
12 changes: 5 additions & 7 deletions agreement/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package agreement
//go:generate dbgen -i agree.sql -p agreement -n agree -o agreeInstall.go -h ../scripts/LICENSE_HEADER
import (
"context"
"runtime/pprof"
"time"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util"
"github.com/algorand/go-algorand/util/db"
"github.com/algorand/go-algorand/util/execpool"
"github.com/algorand/go-algorand/util/timers"
Expand Down Expand Up @@ -144,12 +144,8 @@ func (s *Service) Start() {
input := make(chan externalEvent)
output := make(chan []action)
ready := make(chan externalDemuxSignals)
pprof.Do(context.Background(), pprof.Labels("worker", "agreement.demux"), func(_ context.Context) {
go s.demuxLoop(ctx, input, output, ready)
})
pprof.Do(context.Background(), pprof.Labels("worker", "agreement.main"), func(_ context.Context) {
go s.mainLoop(input, output, ready)
})
go s.demuxLoop(ctx, input, output, ready)
go s.mainLoop(input, output, ready)
}

// Shutdown the execution of the protocol.
Expand All @@ -164,6 +160,7 @@ func (s *Service) Shutdown() {

// demuxLoop repeatedly executes pending actions and then requests the next event from the Service.demux.
func (s *Service) demuxLoop(ctx context.Context, input chan<- externalEvent, output <-chan []action, ready <-chan externalDemuxSignals) {
util.SetGoroutineLabels("func", "agreement.demuxLoop")
for a := range output {
s.do(ctx, a)
extSignals := <-ready
Expand All @@ -188,6 +185,7 @@ func (s *Service) demuxLoop(ctx context.Context, input chan<- externalEvent, out
// 3. Drive the state machine with this input to obtain a slice of pending actions.
// 4. If necessary, persist state to disk.
func (s *Service) mainLoop(input <-chan externalEvent, output chan<- []action, ready chan<- externalDemuxSignals) {
util.SetGoroutineLabels("func", "agreement.mainLoop")
// setup
var clock timers.Clock
var router rootRouter
Expand Down
7 changes: 7 additions & 0 deletions data/transactions/payset.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package transactions
import (
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/msgp/msgp"
)

type (
Expand All @@ -27,6 +28,12 @@ type (
Payset []SignedTxnInBlock
)

type (
// A Payset represents a common, unforgeable, consistent, ordered set of SignedTxn objects.
//msgp:allocbound Payset 100000
RawPayset msgp.Raw
)

// CommitFlat returns a commitment to the Payset, as a flat array.
func (payset Payset) CommitFlat() crypto.Digest {
return payset.commit(false)
Expand Down
7 changes: 3 additions & 4 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"fmt"
"io"
"runtime/pprof"
"sync"

"github.com/algorand/go-algorand/config"
Expand All @@ -33,6 +32,7 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util"
"github.com/algorand/go-algorand/util/execpool"
"github.com/algorand/go-algorand/util/metrics"
)
Expand Down Expand Up @@ -103,9 +103,7 @@ func (handler *TxHandler) Start() {
{Tag: protocol.TxnTag, MessageHandler: network.HandlerFunc(handler.processIncomingTxn)},
})
handler.backlogWg.Add(1)
pprof.Do(context.Background(), pprof.Labels("worker", "TxHandler.backlogWorker"), func(_ context.Context) {
go handler.backlogWorker()
})
go handler.backlogWorker()
}

// Stop suspends the processing of incoming messages at the transaction handler
Expand All @@ -126,6 +124,7 @@ func reencode(stxns []transactions.SignedTxn) []byte {
// and dispatches them further.
func (handler *TxHandler) backlogWorker() {
defer handler.backlogWg.Done()
util.SetGoroutineLabels("func", "TxHandler.backlogWorker")

Check warning on line 127 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L127

Added line #L127 was not covered by tests
for {
// prioritize the postVerificationQueue
select {
Expand Down
7 changes: 3 additions & 4 deletions ledger/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package ledger
import (
"context"
"database/sql"
"runtime/pprof"
"sync"

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/util"
)

// BlockListener represents an object that needs to get notified on new blocks.
Expand All @@ -51,6 +51,7 @@ type blockNotifier struct {

func (bn *blockNotifier) worker() {
defer bn.closing.Done()
util.SetGoroutineLabels("func", "blockNotifier.worker")
bn.mu.Lock()

for {
Expand Down Expand Up @@ -93,9 +94,7 @@ func (bn *blockNotifier) loadFromDisk(l ledgerForTracker, _ basics.Round) error
bn.running = true
bn.pendingBlocks = nil
bn.closing.Add(1)
pprof.Do(context.Background(), pprof.Labels("worker", "blockNotifier"), func(_ context.Context) {
go bn.worker()
})
go bn.worker()
return nil
}

Expand Down
19 changes: 8 additions & 11 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"path"
"regexp"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -812,9 +811,7 @@ func (wn *WebsocketNetwork) Start() {
}
if wn.listener != nil {
wn.wg.Add(1)
pprof.Do(context.Background(), pprof.Labels("worker", "httpdThread"), func(ctx context.Context) {
go wn.httpdThread()
})
go wn.httpdThread()
}
wn.wg.Add(1)
go wn.meshThread()
Expand All @@ -824,13 +821,11 @@ func (wn *WebsocketNetwork) Start() {
wn.peersConnectivityCheckTicker.Stop()
}
wn.peersConnectivityCheckTicker = time.NewTicker(connectionActivityMonitorInterval)
pprof.Do(context.Background(), pprof.Labels("worker", "messageHandlerThread"), func(_ context.Context) {
for i := 0; i < incomingThreads; i++ {
wn.wg.Add(1)
// We pass the peersConnectivityCheckTicker.C here so that we don't need to syncronize the access to the ticker's data structure.
go wn.messageHandlerThread(wn.peersConnectivityCheckTicker.C)
}
})
for i := 0; i < incomingThreads; i++ {
wn.wg.Add(1)
// We pass the peersConnectivityCheckTicker.C here so that we don't need to syncronize the access to the ticker's data structure.
go wn.messageHandlerThread(wn.peersConnectivityCheckTicker.C)
}
wn.wg.Add(1)
go wn.broadcastThread()
if wn.prioScheme != nil {
Expand All @@ -845,6 +840,7 @@ func (wn *WebsocketNetwork) Start() {

func (wn *WebsocketNetwork) httpdThread() {
defer wn.wg.Done()
util.SetGoroutineLabels("func", "network.httpdThread")
var err error
if wn.config.TLSCertFile != "" && wn.config.TLSKeyFile != "" {
err = wn.server.ServeTLS(wn.listener, wn.config.TLSCertFile, wn.config.TLSKeyFile)
Expand Down Expand Up @@ -1190,6 +1186,7 @@ func (wn *WebsocketNetwork) maybeSendMessagesOfInterest(peer *wsPeer, messagesOf

func (wn *WebsocketNetwork) messageHandlerThread(peersConnectivityCheckCh <-chan time.Time) {
defer wn.wg.Done()
util.SetGoroutineLabels("func", "network.messageHandlerThread")

for {
select {
Expand Down
9 changes: 5 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"os"
"path/filepath"
"runtime/pprof"
"strings"
"sync"
"time"
Expand All @@ -50,6 +49,7 @@ import (
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/go-algorand/stateproof"
"github.com/algorand/go-algorand/util"
"github.com/algorand/go-algorand/util/db"
"github.com/algorand/go-algorand/util/execpool"
"github.com/algorand/go-algorand/util/metrics"
Expand Down Expand Up @@ -384,9 +384,7 @@ func (node *AlgorandFullNode) startMonitoringRoutines() {
node.monitoringRoutinesWaitGroup.Add(2)
go node.txPoolGaugeThread(node.ctx.Done())
// Delete old participation keys
pprof.Do(context.Background(), pprof.Labels("worker", "oldKeyDeletionThread"), func(_ context.Context) {
go node.oldKeyDeletionThread(node.ctx.Done())
})
go node.oldKeyDeletionThread(node.ctx.Done())
// TODO re-enable with configuration flag post V1
//go logging.UsageLogThread(node.ctx, node.log, 100*time.Millisecond, nil)
}
Expand Down Expand Up @@ -990,6 +988,7 @@ var txPoolGuage = metrics.MakeGauge(metrics.MetricName{Name: "algod_tx_pool_coun

func (node *AlgorandFullNode) txPoolGaugeThread(done <-chan struct{}) {
defer node.monitoringRoutinesWaitGroup.Done()
util.SetGoroutineLabels("func", "node.txPoolGaugeThread")

Check warning on line 991 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L991

Added line #L991 was not covered by tests
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for true {
Expand Down Expand Up @@ -1029,6 +1028,8 @@ func (node *AlgorandFullNode) OnNewBlock(block bookkeeping.Block, delta ledgerco
// don't have to delete key for each block we received.
func (node *AlgorandFullNode) oldKeyDeletionThread(done <-chan struct{}) {
defer node.monitoringRoutinesWaitGroup.Done()
util.SetGoroutineLabels("func", "node.oldKeyDeletionThread")

Check warning on line 1031 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L1031

Added line #L1031 was not covered by tests

for {
select {
case <-done:
Expand Down
10 changes: 5 additions & 5 deletions util/execpool/backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ package execpool

import (
"context"
"runtime/pprof"
"sync"

"github.com/algorand/go-algorand/util"
)

// A backlog for an execution pool. The typical usage of this is to
Expand Down Expand Up @@ -68,9 +69,7 @@ func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, own
bl.buffer = make(chan backlogItemTask, backlogSize)

bl.wg.Add(1)
pprof.Do(context.Background(), pprof.Labels(profLabels...), func(_ context.Context) {
go bl.worker()
})
go bl.worker(profLabels)
return bl
}

Expand Down Expand Up @@ -126,10 +125,11 @@ func (b *backlog) Shutdown() {
}
}

func (b *backlog) worker() {
func (b *backlog) worker(profLabels []string) {
var t backlogItemTask
var ok bool
defer b.wg.Done()
util.SetGoroutineLabels(profLabels...)

for {

Expand Down
15 changes: 8 additions & 7 deletions util/execpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package execpool
import (
"context"
"runtime"
"runtime/pprof"
"sync"

"github.com/algorand/go-algorand/util"
)

// The list of all valid priority values. When adding new ones, add them before numPrios.
Expand Down Expand Up @@ -82,11 +83,9 @@ func MakePool(owner interface{}, profLabels ...string) ExecutionPool {
}

p.wg.Add(p.numCPUs)
pprof.Do(context.Background(), pprof.Labels(profLabels...), func(_ context.Context) {
for i := 0; i < p.numCPUs; i++ {
go p.worker()
}
})
for i := 0; i < p.numCPUs; i++ {
go p.worker(profLabels)
}
return p
}

Expand Down Expand Up @@ -138,12 +137,14 @@ func (p *pool) Shutdown() {

// worker function blocks until a new task is pending on any of the channels and execute the above task.
// the implementation below would give higher priority for channels that are on higher priority slot.
func (p *pool) worker() {
func (p *pool) worker(profLabels []string) {
var t enqueuedTask
var ok bool
lowPrio := p.inputs[LowPriority]
highPrio := p.inputs[HighPriority]
defer p.wg.Done()
util.SetGoroutineLabels(profLabels...)

for {

select {
Expand Down
7 changes: 7 additions & 0 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package util

import (
"context"
"fmt"
"runtime/pprof"
"syscall"
)

Expand Down Expand Up @@ -62,3 +64,8 @@ func GetCurrentProcessTimes() (utime int64, stime int64, err error) {
}
return
}

// SetGoroutineLabels sets profiler labels for identifying goroutines using the pprof package.
func SetGoroutineLabels(args ...string) {
pprof.SetGoroutineLabels(pprof.WithLabels(context.Background(), pprof.Labels(args...)))

Check warning on line 70 in util/util.go

View check run for this annotation

Codecov / codecov/patch

util/util.go#L70

Added line #L70 was not covered by tests
}

0 comments on commit f458336

Please sign in to comment.