Skip to content

Commit

Permalink
boostx stats: output agent version and retrieval protocols (#1304)
Browse files Browse the repository at this point in the history
* feat: boostx stats - output agent version and retrieval protocols

* fix: boostx stats - count agent versions for both boost and legacy markets nodes
  • Loading branch information
dirkmc authored Mar 20, 2023
1 parent 7804bd3 commit a1e7cc1
Showing 1 changed file with 141 additions and 48 deletions.
189 changes: 141 additions & 48 deletions cmd/boostx/stats_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"fmt"
"github.com/filecoin-project/boost/retrievalmarket/lp2pimpl"
transports_types "github.com/filecoin-project/boost/retrievalmarket/types"
"sort"
"strings"
"sync"
Expand All @@ -21,6 +23,20 @@ var statsCmd = &cli.Command{
Name: "stats",
Description: "Statistics on how many SPs are running Boost",
Before: before,
Flags: []cli.Flag{
&cli.IntFlag{
Name: "lotus-read-concurrency",
Value: 50,
},
&cli.IntFlag{
Name: "sp-query-concurrency",
Value: 1,
},
&cli.IntFlag{
Name: "sp-query-max",
Value: 0,
},
},
Action: func(cctx *cli.Context) error {
ctx := cliutil.ReqContext(cctx)

Expand Down Expand Up @@ -49,7 +65,7 @@ var statsCmd = &cli.Command{
minerToMinerPower := make(map[address.Address]power.Claim)
minerToTotalPower := make(map[address.Address]power.Claim)

throttle := make(chan struct{}, 50)
throttle := make(chan struct{}, cctx.Int("lotus-read-concurrency"))
for _, miner := range miners {
throttle <- struct{}{}
go func(miner address.Address) {
Expand Down Expand Up @@ -78,68 +94,125 @@ var statsCmd = &cli.Command{
fmt.Println("Total SPs with minimum power: ", len(withMinPower))

var boostNodes, marketsNodes, noProtocolsNodes, indexerNodes int

boostRawBytePower := big.NewInt(0)
boostQualityAdjPower := big.NewInt(0)
agentVersions := make(map[string]int)
transportProtos := make(map[string]int)

for _, maddr := range withMinPower {
throttle = make(chan struct{}, cctx.Int("sp-query-concurrency"))
for i, maddr := range withMinPower {
select {
case <-ctx.Done():
return nil
default:
}
err := func() error {
addrInfo, err := cmd.GetAddrInfo(ctx, api, maddr)
if err != nil {
return fmt.Errorf("getting provider multi-address: %w", err)
}

log.Debugw("connecting to storage provider",
"id", addrInfo.ID, "multiaddrs", addrInfo.Addrs, "addr", maddr)
spQueryMax := cctx.Int("sp-query-max")
if spQueryMax > 0 && i >= spQueryMax {
break
}

if err := n.Host.Connect(ctx, *addrInfo); err != nil {
return fmt.Errorf("connecting to peer %s: %w", addrInfo.ID, err)
}
throttle <- struct{}{}
wg.Add(1)
go func(maddr address.Address) {
defer wg.Done()
defer func() {
<-throttle
}()
err := func() error {
addrInfo, err := cmd.GetAddrInfo(ctx, api, maddr)
if err != nil {
return fmt.Errorf("getting provider multi-address: %w", err)
}

log.Debugw("connecting to storage provider",
"id", addrInfo.ID, "multiaddrs", addrInfo.Addrs, "addr", maddr)

if err := n.Host.Connect(ctx, *addrInfo); err != nil {
return fmt.Errorf("connecting to peer %s: %w", addrInfo.ID, err)
}

// Get peer's libp2p protocols
protos, err := n.Host.Peerstore().GetProtocols(addrInfo.ID)
if err != nil {
return fmt.Errorf("getting protocols for peer %s: %w", addrInfo.ID, err)
}
sort.Strings(protos)

// Get peer's libp2p agent version
agentVersionI, err := n.Host.Peerstore().Get(addrInfo.ID, "AgentVersion")
if err != nil {
return fmt.Errorf("getting agent version for peer %s: %w", addrInfo.ID, err)
}
agentVersion, ok := agentVersionI.(string)
if !ok {
return fmt.Errorf("AgentVersion for peer %s is not a string: type %T", addrInfo.ID, agentVersionI)
}

// Get SP's supported transports
var transports *transports_types.QueryResponse
if contains(protos, string(lp2pimpl.TransportsProtocolID)) {
client := lp2pimpl.NewTransportsClient(n.Host)
transports, err = client.SendQuery(ctx, addrInfo.ID)
if err != nil {
fmt.Printf("Failed to fetch transports from peer %s: %s\n", addrInfo.ID, err)
}
}

lk.Lock()
var out string
out += "Provider " + maddr.String()
if contains(protos, "/fil/storage/mk/1.2.0") {
out += " is running boost"

protos, err := n.Host.Peerstore().GetProtocols(addrInfo.ID)
boostNodes++
boostQualityAdjPower = big.Add(boostQualityAdjPower, minerToMinerPower[maddr].QualityAdjPower)
boostRawBytePower = big.Add(boostRawBytePower, minerToMinerPower[maddr].RawBytePower)
agentVersions[agentVersion]++
if transports != nil {
for _, p := range transports.Protocols {
transportProtos[p.Name]++
}
}
} else if contains(protos, "/fil/storage/mk/1.1.0") {
out += " is running markets"
agentVersions[agentVersion]++
marketsNodes++
} else {
out += " is not running markets or boost"
noProtocolsNodes++
}
if contains(protos, "/legs/head/") {
out += " (with indexer)"
indexerNodes++
}
lk.Unlock()

out += "\n"
out += " agent version: " + agentVersion + "\n"
out += " raw power: " + minerToMinerPower[maddr].RawBytePower.String() + "\n"
out += " quality adj power: " + minerToMinerPower[maddr].QualityAdjPower.String() + "\n"
out += " protocols:\n"
out += " " + strings.Join(protos, "\n ") + "\n"

if transports != nil {
out += " transports:\n"
for _, p := range transports.Protocols {
out += " " + p.Name + "\n"
}
}

fmt.Print(out)
return nil
}()
if err != nil {
return fmt.Errorf("getting protocols for peer %s: %w", addrInfo.ID, err)
fmt.Println("Error: ", err)
}
sort.Strings(protos)

fmt.Print("Provider " + maddr.String())
if contains(protos, "/fil/storage/mk/1.2.0") {
fmt.Print(" is running boost")
fmt.Println()

fmt.Println("boost provider ", maddr.String(), "raw power:", minerToMinerPower[maddr].RawBytePower)
fmt.Println("boost provider ", maddr.String(), "quality adj power:", minerToMinerPower[maddr].QualityAdjPower)
fmt.Println("boost provider ", maddr.String(), "protos:", protos)

boostNodes++
boostQualityAdjPower = big.Add(boostQualityAdjPower, minerToMinerPower[maddr].QualityAdjPower)
boostRawBytePower = big.Add(boostRawBytePower, minerToMinerPower[maddr].RawBytePower)
} else if contains(protos, "/fil/storage/mk/1.1.0") {
fmt.Print(" is running markets")
marketsNodes++
} else {
fmt.Print(" is running fewer protocols")
noProtocolsNodes++
}
if contains(protos, "/legs/head/") {
fmt.Print(" (with indexer)")
indexerNodes++
}
fmt.Println()

return nil
}()
if err != nil {
fmt.Println("Error: ", err)
continue
}
}(maddr)
}

wg.Wait()

fmt.Println()
fmt.Println("Total Boost nodes:", boostNodes)
fmt.Println("Total Boost raw power:", boostRawBytePower)
Expand All @@ -148,6 +221,26 @@ var statsCmd = &cli.Command{
fmt.Println("Total SPs with minimum power: ", len(withMinPower))
fmt.Println("Total Indexer nodes:", indexerNodes)

agentVersionsOrder := make([]string, 0, len(agentVersions))
for av := range agentVersions {
agentVersionsOrder = append(agentVersionsOrder, av)
}
sort.Strings(agentVersionsOrder)
fmt.Println("Agent Versions:")
for _, av := range agentVersionsOrder {
fmt.Printf(" %s: %d\n", av, agentVersions[av])
}

transportsOrder := make([]string, 0, len(transportProtos))
for transport := range transportProtos {
transportsOrder = append(transportsOrder, transport)
}
sort.Strings(transportsOrder)
fmt.Println("Retrieval Protocol Support:")
for _, transport := range transportsOrder {
fmt.Printf(" %s: %d\n", transport, transportProtos[transport])
}

return nil
},
}
Expand Down

0 comments on commit a1e7cc1

Please sign in to comment.