diff --git a/agreement/demux.go b/agreement/demux.go index 33e15e6cfd..2f0e9b269b 100644 --- a/agreement/demux.go +++ b/agreement/demux.go @@ -25,6 +25,7 @@ import ( "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/logging/logspec" "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-algorand/util" ) const ( @@ -113,6 +114,7 @@ func (d *demux) tokenizeMessages(ctx context.Context, net Network, tag protocol. defer func() { close(decoded) }() + util.SetGoroutineLabels("tokenizeTag", string(tag)) for { select { case raw, ok := <-networkMessages: diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index c5422af849..6301b1b521 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -138,11 +138,11 @@ func (n *P2PNetwork) Start() { for i := 0; i < incomingThreads; i++ { n.wg.Add(1) // We pass the peersConnectivityCheckTicker.C here so that we don't need to syncronize the access to the ticker's data structure. - go n.handler.messageHandlerThread(&n.wg, n.wsPeersConnectivityCheckTicker.C, n) + go n.handler.messageHandlerThread(&n.wg, n.wsPeersConnectivityCheckTicker.C, n, "network", "P2PNetwork") } n.wg.Add(1) - go n.broadcaster.broadcastThread(&n.wg, n) + go n.broadcaster.broadcastThread(&n.wg, n, "network", "P2PNetwork") n.service.DialPeersUntilTargetCount(n.config.GossipFanout) n.wg.Add(1) diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 6fc97def5e..4a491f4f9f 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -747,10 +747,10 @@ func (wn *WebsocketNetwork) Start() { for i := 0; i < incomingThreads; i++ { wn.wg.Add(1) // We pass the peersConnectivityCheckTicker.C here so that we don't need to syncronize the access to the ticker's data structure. - go wn.handler.messageHandlerThread(&wn.wg, wn.peersConnectivityCheckTicker.C, wn) + go wn.handler.messageHandlerThread(&wn.wg, wn.peersConnectivityCheckTicker.C, wn, "network", "WebsocketNetwork") } wn.wg.Add(1) - go wn.broadcaster.broadcastThread(&wn.wg, wn) + go wn.broadcaster.broadcastThread(&wn.wg, wn, "network", "WebsocketNetwork") if wn.prioScheme != nil { wn.wg.Add(1) go wn.prioWeightRefresh() @@ -1129,8 +1129,9 @@ func (wn *WebsocketNetwork) maybeSendMessagesOfInterest(peer *wsPeer, messagesOf } } -func (wn *msgHandler) messageHandlerThread(wg *sync.WaitGroup, peersConnectivityCheckCh <-chan time.Time, net networkPeerManager) { +func (wn *msgHandler) messageHandlerThread(wg *sync.WaitGroup, peersConnectivityCheckCh <-chan time.Time, net networkPeerManager, profLabels ...string) { defer wg.Done() + util.SetGoroutineLabels(append(profLabels, "func", "msgHandler.messageHandlerThread")...) for { select { @@ -1231,8 +1232,9 @@ func (wn *msgHandler) sendFilterMessage(msg IncomingMessage, net networkPeerMana } } -func (wn *msgBroadcaster) broadcastThread(wg *sync.WaitGroup, net networkPeerManager) { +func (wn *msgBroadcaster) broadcastThread(wg *sync.WaitGroup, net networkPeerManager, profLabels ...string) { defer wg.Done() + util.SetGoroutineLabels(append(profLabels, "func", "msgHandler.broadcastThread")...) slowWritingPeerCheckTicker := time.NewTicker(wn.slowWritingPeerMonitorInterval) defer slowWritingPeerCheckTicker.Stop() diff --git a/node/node.go b/node/node.go index 58160ff3e0..384bd258f6 100644 --- a/node/node.go +++ b/node/node.go @@ -216,9 +216,9 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd } node.net = p2pNode - node.cryptoPool = execpool.MakePool(node) - node.lowPriorityCryptoVerificationPool = execpool.MakeBacklog(node.cryptoPool, 2*node.cryptoPool.GetParallelism(), execpool.LowPriority, node) - node.highPriorityCryptoVerificationPool = execpool.MakeBacklog(node.cryptoPool, 2*node.cryptoPool.GetParallelism(), execpool.HighPriority, node) + node.cryptoPool = execpool.MakePool(node, "worker", "cryptoPool") + node.lowPriorityCryptoVerificationPool = execpool.MakeBacklog(node.cryptoPool, 2*node.cryptoPool.GetParallelism(), execpool.LowPriority, node, "worker", "lowPriorityCryptoVerificationPool") + node.highPriorityCryptoVerificationPool = execpool.MakeBacklog(node.cryptoPool, 2*node.cryptoPool.GetParallelism(), execpool.HighPriority, node, "worker", "highPriorityCryptoVerificationPool") ledgerPaths := ledger.DirsAndPrefix{ DBFilePrefix: config.LedgerFilenamePrefix, ResolvedGenesisDirs: node.genesisDirs, @@ -1061,6 +1061,7 @@ func (node *AlgorandFullNode) OnNewBlock(block bookkeeping.Block, delta ledgerco // don't have to delete key for each block we received. func (node *AlgorandFullNode) oldKeyDeletionThread(done <-chan struct{}) { defer node.monitoringRoutinesWaitGroup.Done() + for { select { case <-done: diff --git a/util/execpool/backlog.go b/util/execpool/backlog.go index 44728d1d9e..c98a2fd427 100644 --- a/util/execpool/backlog.go +++ b/util/execpool/backlog.go @@ -19,6 +19,8 @@ package execpool import ( "context" "sync" + + "github.com/algorand/go-algorand/util" ) // A backlog for an execution pool. The typical usage of this is to @@ -47,7 +49,7 @@ type BacklogPool interface { } // MakeBacklog creates a backlog -func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, owner interface{}) BacklogPool { +func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, owner interface{}, profLabels ...string) BacklogPool { if backlogSize < 0 { return nil } @@ -59,7 +61,7 @@ func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, own bl.ctx, bl.ctxCancel = context.WithCancel(context.Background()) if bl.pool == nil { // create one internally. - bl.pool = MakePool(bl) + bl.pool = MakePool(bl, append(profLabels, "execpool", "internal")...) } if backlogSize == 0 { // use the number of cpus in the system. @@ -68,7 +70,7 @@ func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, own bl.buffer = make(chan backlogItemTask, backlogSize) bl.wg.Add(1) - go bl.worker() + go bl.worker(profLabels) return bl } @@ -129,10 +131,11 @@ func (b *backlog) Shutdown() { } } -func (b *backlog) worker() { +func (b *backlog) worker(profLabels []string) { var t backlogItemTask var ok bool defer b.wg.Done() + util.SetGoroutineLabels(profLabels...) for { diff --git a/util/execpool/pool.go b/util/execpool/pool.go index caa7353ac7..426edd10cb 100644 --- a/util/execpool/pool.go +++ b/util/execpool/pool.go @@ -20,6 +20,8 @@ import ( "context" "runtime" "sync" + + "github.com/algorand/go-algorand/util" ) // The list of all valid priority values. When adding new ones, add them before numPrios. @@ -68,7 +70,7 @@ type enqueuedTask struct { } // MakePool creates a pool. -func MakePool(owner interface{}) ExecutionPool { +func MakePool(owner interface{}, profLabels ...string) ExecutionPool { p := &pool{ inputs: make([]chan enqueuedTask, numPrios), numCPUs: runtime.NumCPU(), @@ -82,9 +84,8 @@ func MakePool(owner interface{}) ExecutionPool { p.wg.Add(p.numCPUs) for i := 0; i < p.numCPUs; i++ { - go p.worker() + go p.worker(profLabels) } - return p } @@ -136,12 +137,14 @@ func (p *pool) Shutdown() { // worker function blocks until a new task is pending on any of the channels and execute the above task. // the implementation below would give higher priority for channels that are on higher priority slot. -func (p *pool) worker() { +func (p *pool) worker(profLabels []string) { var t enqueuedTask var ok bool lowPrio := p.inputs[LowPriority] highPrio := p.inputs[HighPriority] defer p.wg.Done() + util.SetGoroutineLabels(profLabels...) + for { select { diff --git a/util/process.go b/util/process.go index e7ce85ed92..c872b63fe5 100644 --- a/util/process.go +++ b/util/process.go @@ -17,9 +17,11 @@ package util import ( + "context" "io" "os" "os/exec" + "runtime/pprof" "sync" "time" ) @@ -73,3 +75,8 @@ func ExecAndCaptureOutput(command string, args ...string) (string, string, error return string(outputStdout), string(outputStderr), err } + +// SetGoroutineLabels sets profiler labels for identifying goroutines using the pprof package. +func SetGoroutineLabels(args ...string) { + pprof.SetGoroutineLabels(pprof.WithLabels(context.Background(), pprof.Labels(args...))) +}