From a1e7cc12b7f599f974af395784dd1351c32d179f Mon Sep 17 00:00:00 2001 From: dirkmc Date: Mon, 20 Mar 2023 21:10:37 +0800 Subject: [PATCH] boostx stats: output agent version and retrieval protocols (#1304) * feat: boostx stats - output agent version and retrieval protocols * fix: boostx stats - count agent versions for both boost and legacy markets nodes --- cmd/boostx/stats_cmd.go | 189 ++++++++++++++++++++++++++++++---------- 1 file changed, 141 insertions(+), 48 deletions(-) diff --git a/cmd/boostx/stats_cmd.go b/cmd/boostx/stats_cmd.go index 3ec302745..e0ad64cd4 100644 --- a/cmd/boostx/stats_cmd.go +++ b/cmd/boostx/stats_cmd.go @@ -2,6 +2,8 @@ package main import ( "fmt" + "github.com/filecoin-project/boost/retrievalmarket/lp2pimpl" + transports_types "github.com/filecoin-project/boost/retrievalmarket/types" "sort" "strings" "sync" @@ -21,6 +23,20 @@ var statsCmd = &cli.Command{ Name: "stats", Description: "Statistics on how many SPs are running Boost", Before: before, + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "lotus-read-concurrency", + Value: 50, + }, + &cli.IntFlag{ + Name: "sp-query-concurrency", + Value: 1, + }, + &cli.IntFlag{ + Name: "sp-query-max", + Value: 0, + }, + }, Action: func(cctx *cli.Context) error { ctx := cliutil.ReqContext(cctx) @@ -49,7 +65,7 @@ var statsCmd = &cli.Command{ minerToMinerPower := make(map[address.Address]power.Claim) minerToTotalPower := make(map[address.Address]power.Claim) - throttle := make(chan struct{}, 50) + throttle := make(chan struct{}, cctx.Int("lotus-read-concurrency")) for _, miner := range miners { throttle <- struct{}{} go func(miner address.Address) { @@ -78,68 +94,125 @@ var statsCmd = &cli.Command{ fmt.Println("Total SPs with minimum power: ", len(withMinPower)) var boostNodes, marketsNodes, noProtocolsNodes, indexerNodes int - boostRawBytePower := big.NewInt(0) boostQualityAdjPower := big.NewInt(0) + agentVersions := make(map[string]int) + transportProtos := make(map[string]int) - for _, maddr := range withMinPower { + throttle = make(chan struct{}, cctx.Int("sp-query-concurrency")) + for i, maddr := range withMinPower { select { case <-ctx.Done(): return nil default: } - err := func() error { - addrInfo, err := cmd.GetAddrInfo(ctx, api, maddr) - if err != nil { - return fmt.Errorf("getting provider multi-address: %w", err) - } - log.Debugw("connecting to storage provider", - "id", addrInfo.ID, "multiaddrs", addrInfo.Addrs, "addr", maddr) + spQueryMax := cctx.Int("sp-query-max") + if spQueryMax > 0 && i >= spQueryMax { + break + } - if err := n.Host.Connect(ctx, *addrInfo); err != nil { - return fmt.Errorf("connecting to peer %s: %w", addrInfo.ID, err) - } + throttle <- struct{}{} + wg.Add(1) + go func(maddr address.Address) { + defer wg.Done() + defer func() { + <-throttle + }() + err := func() error { + addrInfo, err := cmd.GetAddrInfo(ctx, api, maddr) + if err != nil { + return fmt.Errorf("getting provider multi-address: %w", err) + } + + log.Debugw("connecting to storage provider", + "id", addrInfo.ID, "multiaddrs", addrInfo.Addrs, "addr", maddr) + + if err := n.Host.Connect(ctx, *addrInfo); err != nil { + return fmt.Errorf("connecting to peer %s: %w", addrInfo.ID, err) + } + + // Get peer's libp2p protocols + protos, err := n.Host.Peerstore().GetProtocols(addrInfo.ID) + if err != nil { + return fmt.Errorf("getting protocols for peer %s: %w", addrInfo.ID, err) + } + sort.Strings(protos) + + // Get peer's libp2p agent version + agentVersionI, err := n.Host.Peerstore().Get(addrInfo.ID, "AgentVersion") + if err != nil { + return fmt.Errorf("getting agent version for peer %s: %w", addrInfo.ID, err) + } + agentVersion, ok := agentVersionI.(string) + if !ok { + return fmt.Errorf("AgentVersion for peer %s is not a string: type %T", addrInfo.ID, agentVersionI) + } + + // Get SP's supported transports + var transports *transports_types.QueryResponse + if contains(protos, string(lp2pimpl.TransportsProtocolID)) { + client := lp2pimpl.NewTransportsClient(n.Host) + transports, err = client.SendQuery(ctx, addrInfo.ID) + if err != nil { + fmt.Printf("Failed to fetch transports from peer %s: %s\n", addrInfo.ID, err) + } + } + + lk.Lock() + var out string + out += "Provider " + maddr.String() + if contains(protos, "/fil/storage/mk/1.2.0") { + out += " is running boost" - protos, err := n.Host.Peerstore().GetProtocols(addrInfo.ID) + boostNodes++ + boostQualityAdjPower = big.Add(boostQualityAdjPower, minerToMinerPower[maddr].QualityAdjPower) + boostRawBytePower = big.Add(boostRawBytePower, minerToMinerPower[maddr].RawBytePower) + agentVersions[agentVersion]++ + if transports != nil { + for _, p := range transports.Protocols { + transportProtos[p.Name]++ + } + } + } else if contains(protos, "/fil/storage/mk/1.1.0") { + out += " is running markets" + agentVersions[agentVersion]++ + marketsNodes++ + } else { + out += " is not running markets or boost" + noProtocolsNodes++ + } + if contains(protos, "/legs/head/") { + out += " (with indexer)" + indexerNodes++ + } + lk.Unlock() + + out += "\n" + out += " agent version: " + agentVersion + "\n" + out += " raw power: " + minerToMinerPower[maddr].RawBytePower.String() + "\n" + out += " quality adj power: " + minerToMinerPower[maddr].QualityAdjPower.String() + "\n" + out += " protocols:\n" + out += " " + strings.Join(protos, "\n ") + "\n" + + if transports != nil { + out += " transports:\n" + for _, p := range transports.Protocols { + out += " " + p.Name + "\n" + } + } + + fmt.Print(out) + return nil + }() if err != nil { - return fmt.Errorf("getting protocols for peer %s: %w", addrInfo.ID, err) + fmt.Println("Error: ", err) } - sort.Strings(protos) - - fmt.Print("Provider " + maddr.String()) - if contains(protos, "/fil/storage/mk/1.2.0") { - fmt.Print(" is running boost") - fmt.Println() - - fmt.Println("boost provider ", maddr.String(), "raw power:", minerToMinerPower[maddr].RawBytePower) - fmt.Println("boost provider ", maddr.String(), "quality adj power:", minerToMinerPower[maddr].QualityAdjPower) - fmt.Println("boost provider ", maddr.String(), "protos:", protos) - - boostNodes++ - boostQualityAdjPower = big.Add(boostQualityAdjPower, minerToMinerPower[maddr].QualityAdjPower) - boostRawBytePower = big.Add(boostRawBytePower, minerToMinerPower[maddr].RawBytePower) - } else if contains(protos, "/fil/storage/mk/1.1.0") { - fmt.Print(" is running markets") - marketsNodes++ - } else { - fmt.Print(" is running fewer protocols") - noProtocolsNodes++ - } - if contains(protos, "/legs/head/") { - fmt.Print(" (with indexer)") - indexerNodes++ - } - fmt.Println() - - return nil - }() - if err != nil { - fmt.Println("Error: ", err) - continue - } + }(maddr) } + wg.Wait() + fmt.Println() fmt.Println("Total Boost nodes:", boostNodes) fmt.Println("Total Boost raw power:", boostRawBytePower) @@ -148,6 +221,26 @@ var statsCmd = &cli.Command{ fmt.Println("Total SPs with minimum power: ", len(withMinPower)) fmt.Println("Total Indexer nodes:", indexerNodes) + agentVersionsOrder := make([]string, 0, len(agentVersions)) + for av := range agentVersions { + agentVersionsOrder = append(agentVersionsOrder, av) + } + sort.Strings(agentVersionsOrder) + fmt.Println("Agent Versions:") + for _, av := range agentVersionsOrder { + fmt.Printf(" %s: %d\n", av, agentVersions[av]) + } + + transportsOrder := make([]string, 0, len(transportProtos)) + for transport := range transportProtos { + transportsOrder = append(transportsOrder, transport) + } + sort.Strings(transportsOrder) + fmt.Println("Retrieval Protocol Support:") + for _, transport := range transportsOrder { + fmt.Printf(" %s: %d\n", transport, transportProtos[transport]) + } + return nil }, }