Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools: adapt loadgenerator to cluster testing #4324

Merged
merged 3 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions cmd/loadgenerator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package main

import (
"encoding/json"
"io"
"net/url"
"os"
"strings"
)

const configFileName = "loadgenerator.config"

type config struct {
// AccountMnemonic is the mnemonic of the account from which we would like to spend Algos.
AccountMnemonic string
Expand All @@ -39,20 +39,30 @@ type config struct {
RoundOffset uint64
// Fee is the amount of algos that would be specified in the transaction fee field.
Fee uint64
// TxnsToSend is the number of transactions to send in the round where (((round + RoundOffset) % RoundModulator) == 0)
TxnsToSend int
}

type fileConfig struct {
config
ClientURL string `json:"ClientURL"`
}

func loadConfig() (cfg config, err error) {
var fd *os.File
fd, err = os.Open(configFileName)
if err != nil {
return config{}, err
func loadConfig(configFileName string) (cfg config, err error) {
var fin io.Reader
if len(configFileName) > 0 && configFileName[0] == '{' {
// read -config "{json literal}"
fin = strings.NewReader(configFileName)
} else {
var fd *os.File
fd, err = os.Open(configFileName)
if err != nil {
return config{}, err
}
defer fd.Close()
fin = fd
}
jsonDecoder := json.NewDecoder(fd)
jsonDecoder := json.NewDecoder(fin)
var fileCfg fileConfig
err = jsonDecoder.Decode(&fileCfg)
if err == nil {
Expand Down
123 changes: 99 additions & 24 deletions cmd/loadgenerator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ package main
import (
"flag"
"fmt"
"io/fs"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
Expand All @@ -30,19 +34,21 @@ import (
"github.com/algorand/go-algorand/daemon/algod/api/client"
generatedV2 "github.com/algorand/go-algorand/daemon/algod/api/server/v2/generated"
"github.com/algorand/go-algorand/daemon/algod/api/spec/common"
algodAcct "github.com/algorand/go-algorand/data/account"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/db"
)

const transactionBlockSize = 800

var runOnce = flag.Bool("once", false, "Terminate after first spend loop")

var nroutines = runtime.NumCPU() * 2

func init() {
flag.Parse()
func maybefail(err error, msg string, args ...interface{}) {
if err == nil {
return
}
fmt.Fprintf(os.Stderr, msg, args...)
os.Exit(1)
}

func loadMnemonic(mnemonic string) crypto.Seed {
Expand All @@ -57,13 +63,63 @@ func loadMnemonic(mnemonic string) crypto.Seed {
return seed
}

// Like shared/pingpong/accounts.go
func findRootKeys(algodDir string) []*crypto.SignatureSecrets {
keylist := make([]*crypto.SignatureSecrets, 0, 5)
err := filepath.Walk(algodDir, func(path string, info fs.FileInfo, err error) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, I think filepath.Walk wants you to do this with the err:

Suggested change
err := filepath.Walk(algodDir, func(path string, info fs.FileInfo, err error) error {
err := filepath.Walk(algodDir, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return err
}

var handle db.Accessor
handle, err = db.MakeErasableAccessor(path)
if err != nil {
return nil // don't care, move on
}
defer handle.Close()

// Fetch an account.Participation from the database
root, err := algodAcct.RestoreRoot(handle)
if err != nil {
return nil // don't care, move on
}
keylist = append(keylist, root.Secrets())
return nil
})
if err != nil {
fmt.Fprintf(os.Stderr, "%s: warning, %v\n", algodDir, err)
}
return keylist
}

var runOnce = flag.Bool("once", false, "Terminate after first spend loop")

func main() {
var algodDir string
flag.StringVar(&algodDir, "d", "", "algorand data dir")
var configArg string
flag.StringVar(&configArg, "config", "loadgenerator.config", "path to json or json literal")

var cfg config
var err error
if cfg, err = loadConfig(); err != nil {
flag.Parse()
if cfg, err = loadConfig(configArg); err != nil {
fmt.Fprintf(os.Stderr, "unable to load config : %v\n", err)
os.Exit(1)
}

if (cfg.ClientURL == nil || cfg.ClientURL.String() == "") || cfg.APIToken == "" {
if algodDir != "" {
path := filepath.Join(algodDir, "algod.net")
net, err := ioutil.ReadFile(path)
maybefail(err, "%s: %v\n", path, err)
path = filepath.Join(algodDir, "algod.token")
token, err := ioutil.ReadFile(path)
maybefail(err, "%s: %v\n", path, err)
cfg.ClientURL, err = url.Parse(fmt.Sprintf("http://%s", string(strings.TrimSpace(string(net)))))
maybefail(err, "bad net url %v\n", err)
cfg.APIToken = string(token)
} else {
fmt.Fprintf(os.Stderr, "need (config.ClientURL and config.APIToken) or (-d ALGORAND_DATA)\n")
os.Exit(1)
}
}
fmt.Printf("Configuration file loaded successfully.\n")

var privateKeys []*crypto.SignatureSecrets
Expand All @@ -79,8 +135,19 @@ func main() {
for _, mnemonic := range cfg.AccountMnemonicList {
addKey(mnemonic)
}
} else if len(algodDir) > 0 {
// get test cluster local unlocked wallet
privateKeys := findRootKeys(algodDir)
if len(privateKeys) == 0 {
fmt.Fprintf(os.Stderr, "%s: found no root keys\n", algodDir)
os.Exit(1)
}
publicKeys = make([]basics.Address, len(privateKeys))
for i, sk := range privateKeys {
publicKeys[i] = basics.Address(sk.SignatureVerifier)
}
} else {
fmt.Fprintf(os.Stderr, "no keys specified in config files")
fmt.Fprintf(os.Stderr, "no keys specified in config files or -d algod dir")
}

for i, publicKey := range publicKeys {
Expand Down Expand Up @@ -110,9 +177,10 @@ func nextSpendRound(cfg config, round uint64) uint64 {
func spendLoop(cfg config, privateKey []*crypto.SignatureSecrets, publicKey []basics.Address) (err error) {
restClient := client.MakeRestClient(*cfg.ClientURL, cfg.APIToken)
for {
waitForRound(restClient, cfg, true)
queueFull := generateTransactions(restClient, cfg, privateKey, publicKey)
nodeStatus := waitForRound(restClient, cfg, true)
queueFull := generateTransactions(restClient, cfg, privateKey, publicKey, nodeStatus)
if queueFull {
// done for this round, wait for a non-send round
waitForRound(restClient, cfg, false)
if *runOnce {
fmt.Fprintf(os.Stdout, "Once flag set, terminating.\n")
Expand All @@ -123,8 +191,7 @@ func spendLoop(cfg config, privateKey []*crypto.SignatureSecrets, publicKey []ba
return nil
}

func waitForRound(restClient client.RestClient, cfg config, spendingRound bool) {
var nodeStatus generatedV2.NodeStatusResponse
func waitForRound(restClient client.RestClient, cfg config, spendingRound bool) (nodeStatus generatedV2.NodeStatusResponse) {
var err error
for {
nodeStatus, err = restClient.Status()
Expand All @@ -138,7 +205,7 @@ func waitForRound(restClient client.RestClient, cfg config, spendingRound bool)
return
}
if spendingRound {
fmt.Printf("Current round %d, waiting for spending round %d\n", nodeStatus.LastRound, nextSpendRound(cfg, nodeStatus.LastRound))
fmt.Printf("Last round %d, waiting for spending round %d\n", nodeStatus.LastRound, nextSpendRound(cfg, nodeStatus.LastRound))
}
for {
// wait for the next round.
Expand All @@ -156,14 +223,11 @@ func waitForRound(restClient client.RestClient, cfg config, spendingRound bool)
}
}

func generateTransactions(restClient client.RestClient, cfg config, privateKeys []*crypto.SignatureSecrets, publicKeys []basics.Address) (queueFull bool) {
var nodeStatus generatedV2.NodeStatusResponse
const transactionBlockSize = 800

func generateTransactions(restClient client.RestClient, cfg config, privateKeys []*crypto.SignatureSecrets, publicKeys []basics.Address, nodeStatus generatedV2.NodeStatusResponse) (queueFull bool) {
start := time.Now()
var err error
nodeStatus, err = restClient.Status()
Copy link
Contributor

@cce cce Jul 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh OK without this change, multiple loadgenerators wouldn't necessarily all target the same block... but one might also miss the block (like we were seeing) if something slow happens between waitForRound and generateTransactions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of any reason why you would miss the round by the time you get around to sending, unless the node you're talking to was running behind or something

if err != nil {
fmt.Fprintf(os.Stderr, "unable to check status : %v", err)
return false
}
var vers common.Version
vers, err = restClient.Versions()
if err != nil {
Expand All @@ -172,8 +236,12 @@ func generateTransactions(restClient client.RestClient, cfg config, privateKeys
}
var genesisHash crypto.Digest
copy(genesisHash[:], vers.GenesisHash)
// create transactionBlockSize transaction to send.
txns := make([]transactions.SignedTxn, transactionBlockSize, transactionBlockSize)
sendSize := cfg.TxnsToSend
if cfg.TxnsToSend == 0 {
sendSize = transactionBlockSize
}
// create sendSize transaction to send.
txns := make([]transactions.SignedTxn, sendSize, sendSize)
for i := range txns {
tx := transactions.Transaction{
Header: transactions.Header{
Expand All @@ -196,13 +264,14 @@ func generateTransactions(restClient client.RestClient, cfg config, privateKeys
}

// create multiple go-routines to send all these requests.
// each thread makes new HTTP connections per API call
var sendWaitGroup sync.WaitGroup
sendWaitGroup.Add(nroutines)
sent := make([]int, nroutines, nroutines)
for i := 0; i < nroutines; i++ {
go func(base int) {
defer sendWaitGroup.Done()
for x := base; x < transactionBlockSize; x += nroutines {
for x := base; x < sendSize; x += nroutines {
_, err2 := restClient.SendRawTransaction(txns[x])
if err2 != nil {
if strings.Contains(err2.Error(), "txn dead") || strings.Contains(err2.Error(), "below threshold") {
Expand All @@ -220,5 +289,11 @@ func generateTransactions(restClient client.RestClient, cfg config, privateKeys
for i := 0; i < nroutines; i++ {
totalSent += sent[i]
}
return totalSent != transactionBlockSize
dt := time.Now().Sub(start)
fmt.Fprintf(os.Stdout, "sent %d/%d in %s (%.1f/s)\n", totalSent, sendSize, dt.String(), float64(totalSent)/dt.Seconds())
if cfg.TxnsToSend != 0 {
// We attempted what we were asked. We're done.
return true
}
return totalSent != sendSize
}