From 182e02345585893652187852d14871f1cb50cbc5 Mon Sep 17 00:00:00 2001 From: Sam Liokumovich <65994425+samliok@users.noreply.github.com> Date: Thu, 10 Oct 2024 18:19:44 -0400 Subject: [PATCH] Decouple Spam Functionality from CLI into Throughput Package (#1636) --- auth/consts.go | 4 + auth/utils.go | 44 ++ cli/spam.go | 616 ++---------------- cli/storage.go | 14 +- examples/morpheusvm/auth/keys.go | 108 +++ .../cmd/morpheus-cli/cmd/handler.go | 4 +- .../morpheusvm/cmd/morpheus-cli/cmd/key.go | 106 +-- .../morpheusvm/cmd/morpheus-cli/cmd/root.go | 8 + .../morpheusvm/cmd/morpheus-cli/cmd/spam.go | 84 +-- examples/morpheusvm/tests/e2e/e2e_test.go | 9 +- .../tests/integration/integration_test.go | 2 +- .../morpheusvm/tests/workload/workload.go | 9 +- examples/morpheusvm/throughput/helper.go | 65 ++ .../cmd/vmwithcontracts-cli/cmd/handler.go | 4 +- .../cmd/vmwithcontracts-cli/cmd/key.go | 17 +- .../cmd/vmwithcontracts-cli/cmd/spam.go | 38 +- .../vmwithcontracts/tests/e2e/e2e_test.go | 2 +- tests/e2e/e2e.go | 41 +- throughput/errors.go | 8 + throughput/helper.go | 40 ++ throughput/issuer.go | 171 +++++ throughput/pacer.go | 59 ++ throughput/spam.go | 426 ++++++++++++ throughput/utils.go | 12 + 24 files changed, 1087 insertions(+), 804 deletions(-) create mode 100644 auth/utils.go create mode 100644 examples/morpheusvm/auth/keys.go create mode 100644 examples/morpheusvm/throughput/helper.go create mode 100644 throughput/errors.go create mode 100644 throughput/helper.go create mode 100644 throughput/issuer.go create mode 100644 throughput/pacer.go create mode 100644 throughput/spam.go create mode 100644 throughput/utils.go diff --git a/auth/consts.go b/auth/consts.go index 0f88ee726b..2548a4e69e 100644 --- a/auth/consts.go +++ b/auth/consts.go @@ -11,6 +11,10 @@ const ( ED25519ID uint8 = 0 SECP256R1ID uint8 = 1 BLSID uint8 = 2 + + ED25519Key = "ed25519" + Secp256r1Key = "secp256r1" + BLSKey = "bls" ) func Engines() map[uint8]vm.AuthEngine { diff --git a/auth/utils.go b/auth/utils.go new file mode 100644 index 0000000000..baed04b64c --- /dev/null +++ b/auth/utils.go @@ -0,0 +1,44 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package auth + +import ( + "errors" + + "github.com/ava-labs/hypersdk/chain" + "github.com/ava-labs/hypersdk/codec" + "github.com/ava-labs/hypersdk/crypto/bls" + "github.com/ava-labs/hypersdk/crypto/ed25519" + "github.com/ava-labs/hypersdk/crypto/secp256r1" +) + +var ErrInvalidKeyType = errors.New("invalid key type") + +// Used for testing & CLI purposes +type PrivateKey struct { + Address codec.Address + // Bytes is the raw private key bytes + Bytes []byte +} + +// GetFactory returns the [chain.AuthFactory] for a given private key. +// +// A [chain.AuthFactory] signs transactions and provides a unit estimate +// for using a given private key (needed to estimate fees for a transaction). +func GetFactory(pk *PrivateKey) (chain.AuthFactory, error) { + switch pk.Address[0] { + case ED25519ID: + return NewED25519Factory(ed25519.PrivateKey(pk.Bytes)), nil + case SECP256R1ID: + return NewSECP256R1Factory(secp256r1.PrivateKey(pk.Bytes)), nil + case BLSID: + p, err := bls.PrivateKeyFromBytes(pk.Bytes) + if err != nil { + return nil, err + } + return NewBLSFactory(p), nil + default: + return nil, ErrInvalidKeyType + } +} diff --git a/cli/spam.go b/cli/spam.go index 00147651e4..01ce1c16b4 100644 --- a/cli/spam.go +++ b/cli/spam.go @@ -1,630 +1,124 @@ // Copyright (C) 2024, Ava Labs, Inc. All rights reserved. // See the file LICENSE for licensing terms. -//nolint:gosec package cli import ( "context" - "encoding/binary" - "fmt" - "math/rand" - "os" - "os/signal" - "runtime" - "strings" - "sync" - "sync/atomic" - "syscall" - "time" - "github.com/ava-labs/avalanchego/utils/set" - "golang.org/x/sync/errgroup" - - "github.com/ava-labs/hypersdk/api/jsonrpc" - "github.com/ava-labs/hypersdk/api/ws" - "github.com/ava-labs/hypersdk/chain" "github.com/ava-labs/hypersdk/cli/prompt" - "github.com/ava-labs/hypersdk/codec" "github.com/ava-labs/hypersdk/consts" - "github.com/ava-labs/hypersdk/fees" - "github.com/ava-labs/hypersdk/pubsub" - "github.com/ava-labs/hypersdk/utils" -) - -const ( - pendingTargetMultiplier = 10 - successfulRunsToIncreaseTarget = 10 - failedRunsToDecreaseTarget = 5 - - issuerShutdownTimeout = 60 * time.Second -) - -var ( - maxConcurrency = runtime.NumCPU() - issuerWg sync.WaitGroup - - l sync.Mutex - confirmedTxs uint64 - totalTxs uint64 - - inflight atomic.Int64 - sent atomic.Int64 + "github.com/ava-labs/hypersdk/throughput" ) -type SpamHelper interface { - // CreateAccount generates a new account and returns the [PrivateKey]. - // - // The spammer tracks all created accounts and orchestrates the return of funds - // sent to any created accounts on shutdown. If the spammer exits ungracefully, - // any funds sent to created accounts will be lost unless they are persisted by - // the [SpamHelper] implementation. - CreateAccount() (*PrivateKey, error) - // GetFactory returns the [chain.AuthFactory] for a given private key. - // - // A [chain.AuthFactory] signs transactions and provides a unit estimate - // for using a given private key (needed to estimate fees for a transaction). - GetFactory(pk *PrivateKey) (chain.AuthFactory, error) - - // CreateClient instructs the [SpamHelper] to create and persist a VM-specific - // JSONRPC client. - // - // This client is used to retrieve the [chain.Parser] and the balance - // of arbitrary addresses. - // - // TODO: consider making these functions part of the required JSONRPC - // interface for the HyperSDK. - CreateClient(uri string) error - GetParser(ctx context.Context) (chain.Parser, error) - LookupBalance(choice int, address codec.Address) (uint64, error) - - // GetTransfer returns a list of actions that sends [amount] to a given [address]. - // - // Memo is used to ensure that each transaction is unique (even if between the same - // sender and receiver for the same amount). - GetTransfer(address codec.Address, amount uint64, memo []byte) []chain.Action -} - -func (h *Handler) Spam(sh SpamHelper) error { - ctx := context.Background() - +// BuildSpammer prompts the user for the spammer parameters. If [defaults], the default values are used once the +// chain and root key are selected. Otherwise, the user is prompted for all parameters. +func (h *Handler) BuildSpammer(sh throughput.SpamHelper, defaults bool) (*throughput.Spammer, error) { // Select chain chains, err := h.GetChains() if err != nil { - return err + return nil, err } _, uris, err := prompt.SelectChain("select chainID", chains) if err != nil { - return err - } - cli := jsonrpc.NewJSONRPCClient(uris[0]) - if err != nil { - return err + return nil, err } // Select root key keys, err := h.GetKeys() if err != nil { - return err + return nil, err } balances := make([]uint64, len(keys)) if err := sh.CreateClient(uris[0]); err != nil { - return err + return nil, err } for i := 0; i < len(keys); i++ { - balance, err := sh.LookupBalance(i, keys[i].Address) + balance, err := sh.LookupBalance(keys[i].Address) if err != nil { - return err + return nil, err } balances[i] = balance } + keyIndex, err := prompt.Choice("select root key", len(keys)) if err != nil { - return err + return nil, err } key := keys[keyIndex] balance := balances[keyIndex] - factory, err := sh.GetFactory(key) - if err != nil { - return err - } - // No longer using db, so we close if err := h.CloseDatabase(); err != nil { - return err - } - - // Compute max units - parser, err := sh.GetParser(ctx) - if err != nil { - return err + return nil, err + } + + if defaults { + return throughput.NewSpammer( + uris, + key, + balance, + 1.01, + 2.7, + 100000, // tx per second + 15000, // min tx per second + 1000, // tx per second step + 10, // num clients + 10000000, // num accounts + ), nil } - actions := sh.GetTransfer(keys[0].Address, 0, uniqueBytes()) - maxUnits, err := chain.EstimateUnits(parser.Rules(time.Now().UnixMilli()), actions, factory) - if err != nil { - return err - } - // Collect parameters numAccounts, err := prompt.Int("number of accounts", consts.MaxInt) if err != nil { - return err + return nil, err } if numAccounts < 2 { - return ErrInsufficientAccounts + return nil, ErrInsufficientAccounts } sZipf, err := prompt.Float("s (Zipf distribution = [(v+k)^(-s)], Default = 1.01)", consts.MaxFloat64) if err != nil { - return err + return nil, err } vZipf, err := prompt.Float("v (Zipf distribution = [(v+k)^(-s)], Default = 2.7)", consts.MaxFloat64) if err != nil { - return err + return nil, err } + txsPerSecond, err := prompt.Int("txs to try and issue per second", consts.MaxInt) if err != nil { - return err + return nil, err } minTxsPerSecond, err := prompt.Int("minimum txs to issue per second", consts.MaxInt) if err != nil { - return err + return nil, err } txsPerSecondStep, err := prompt.Int("txs to increase per second", consts.MaxInt) if err != nil { - return err + return nil, err } numClients, err := prompt.Int("number of clients per node", consts.MaxInt) if err != nil { - return err - } - - // Log Zipf participants - zipfSeed := rand.New(rand.NewSource(0)) - zz := rand.NewZipf(zipfSeed, sZipf, vZipf, uint64(numAccounts)-1) - trials := txsPerSecond * 60 * 2 // sender/receiver - unique := set.NewSet[uint64](trials) - for i := 0; i < trials; i++ { - unique.Add(zz.Uint64()) - } - utils.Outf("{{blue}}unique participants expected every 60s:{{/}} %d\n", unique.Len()) - - // Distribute funds - unitPrices, err := cli.UnitPrices(ctx, false) - if err != nil { - return err - } - feePerTx, err := fees.MulSum(unitPrices, maxUnits) - if err != nil { - return err - } - withholding := feePerTx * uint64(numAccounts) - if balance < withholding { - return fmt.Errorf("insufficient funds (have=%d need=%d)", balance, withholding) - } - distAmount := (balance - withholding) / uint64(numAccounts) - utils.Outf( - "{{yellow}}distributing funds to each account:{{/}} %s %s\n", - utils.FormatBalance(distAmount), - h.c.Symbol(), - ) - accounts := make([]*PrivateKey, numAccounts) - webSocketClient, err := ws.NewWebSocketClient(uris[0], ws.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) // we write the max read - if err != nil { - return err - } - funds := map[codec.Address]uint64{} - factories := make([]chain.AuthFactory, numAccounts) - var fundsL sync.Mutex - p := &pacer{ws: webSocketClient} - go p.Run(ctx, minTxsPerSecond) - for i := 0; i < numAccounts; i++ { - // Create account - pk, err := sh.CreateAccount() - if err != nil { - return err - } - accounts[i] = pk - f, err := sh.GetFactory(pk) - if err != nil { - return err - } - factories[i] = f - - // Send funds - actions := sh.GetTransfer(pk.Address, distAmount, uniqueBytes()) - _, tx, err := cli.GenerateTransactionManual(parser, actions, factory, feePerTx) - if err != nil { - return err - } - if err := p.Add(tx); err != nil { - return fmt.Errorf("%w: failed to register tx", err) - } - funds[pk.Address] = distAmount - - // Log progress - if i%250 == 0 && i > 0 { - utils.Outf("{{yellow}}issued transfer to %d accounts{{/}}\n", i) - } - } - if err := p.Wait(); err != nil { - return err - } - utils.Outf("{{yellow}}distributed funds to %d accounts{{/}}\n", numAccounts) - - // Kickoff txs - issuers := []*issuer{} - for i := 0; i < len(uris); i++ { - for j := 0; j < numClients; j++ { - cli := jsonrpc.NewJSONRPCClient(uris[i]) - webSocketClient, err := ws.NewWebSocketClient(uris[i], ws.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) // we write the max read - if err != nil { - return err - } - issuer := &issuer{i: len(issuers), cli: cli, ws: webSocketClient, parser: parser, uri: uris[i]} - issuers = append(issuers, issuer) - } - } - signals := make(chan os.Signal, 2) - signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) - - // Start issuers - unitPrices, err = issuers[0].cli.UnitPrices(ctx, false) - if err != nil { - return err - } - cctx, cancel := context.WithCancel(ctx) - defer cancel() - for _, issuer := range issuers { - issuer.Start(cctx) - } - - // Log stats - t := time.NewTicker(1 * time.Second) // ensure no duplicates created - defer t.Stop() - var psent int64 - go func() { - for { - select { - case <-t.C: - current := sent.Load() - l.Lock() - if totalTxs > 0 { - unitPrices, err = issuers[0].cli.UnitPrices(ctx, false) - if err != nil { - continue - } - utils.Outf( - "{{yellow}}txs seen:{{/}} %d {{yellow}}success rate:{{/}} %.2f%% {{yellow}}inflight:{{/}} %d {{yellow}}issued/s:{{/}} %d {{yellow}}unit prices:{{/}} [%s]\n", //nolint:lll - totalTxs, - float64(confirmedTxs)/float64(totalTxs)*100, - inflight.Load(), - current-psent, - unitPrices, - ) - } - l.Unlock() - psent = current - case <-cctx.Done(): - return - } - } - }() - - // Broadcast txs - var ( - // Do not call this function concurrently (math.Rand is not safe for concurrent use) - z = rand.NewZipf(zipfSeed, sZipf, vZipf, uint64(numAccounts)-1) - - it = time.NewTimer(0) - currentTarget = min(txsPerSecond, minTxsPerSecond) - consecutiveUnderBacklog int - consecutiveAboveBacklog int - - stop bool - ) - utils.Outf("{{cyan}}initial target tps:{{/}} %d\n", currentTarget) - for !stop { - select { - case <-it.C: - start := time.Now() - - // Check to see if we should wait for pending txs - if int64(currentTarget)+inflight.Load() > int64(currentTarget*pendingTargetMultiplier) { - consecutiveUnderBacklog = 0 - consecutiveAboveBacklog++ - if consecutiveAboveBacklog >= failedRunsToDecreaseTarget { - if currentTarget > txsPerSecondStep { - currentTarget -= txsPerSecondStep - utils.Outf("{{cyan}}skipping issuance because large backlog detected, decreasing target tps:{{/}} %d\n", currentTarget) - } else { - utils.Outf("{{cyan}}skipping issuance because large backlog detected, cannot decrease target{{/}}\n") - } - consecutiveAboveBacklog = 0 - } - it.Reset(1 * time.Second) - break - } - - // Issue txs - g := &errgroup.Group{} - g.SetLimit(maxConcurrency) - for i := 0; i < currentTarget; i++ { - senderIndex, recipientIndex := z.Uint64(), z.Uint64() - sender := accounts[senderIndex] - if recipientIndex == senderIndex { - if recipientIndex == uint64(numAccounts-1) { - recipientIndex-- - } else { - recipientIndex++ - } - } - recipient := accounts[recipientIndex].Address - issuer := getRandomIssuer(issuers) - g.Go(func() error { - factory := factories[senderIndex] - fundsL.Lock() - balance := funds[sender.Address] - if feePerTx > balance { - fundsL.Unlock() - utils.Outf("{{orange}}tx has insufficient funds:{{/}} %s\n", sender.Address) - return fmt.Errorf("%s has insufficient funds", sender.Address) - } - funds[sender.Address] = balance - feePerTx - fundsL.Unlock() - - // Send transaction - actions := sh.GetTransfer(recipient, 1, uniqueBytes()) - return issuer.Send(cctx, actions, factory, feePerTx) - }) - } - - // Wait for txs to finish - if err := g.Wait(); err != nil { - // We don't return here because we want to return funds - utils.Outf("{{orange}}broadcast loop error:{{/}} %v\n", err) - stop = true - break - } - - // Determine how long to sleep - dur := time.Since(start) - sleep := max(float64(consts.MillisecondsPerSecond-dur.Milliseconds()), 0) - it.Reset(time.Duration(sleep) * time.Millisecond) - - // Check to see if we should increase target - consecutiveAboveBacklog = 0 - consecutiveUnderBacklog++ - if consecutiveUnderBacklog >= successfulRunsToIncreaseTarget && currentTarget < txsPerSecond { - currentTarget = min(currentTarget+txsPerSecondStep, txsPerSecond) - utils.Outf("{{cyan}}increasing target tps:{{/}} %d\n", currentTarget) - consecutiveUnderBacklog = 0 - } - case <-cctx.Done(): - stop = true - utils.Outf("{{yellow}}context canceled{{/}}\n") - case <-signals: - stop = true - utils.Outf("{{yellow}}exiting broadcast loop{{/}}\n") - cancel() - } - } - - // Wait for all issuers to finish - utils.Outf("{{yellow}}waiting for issuers to return{{/}}\n") - issuerWg.Wait() - - // Return funds - utils.Outf("{{yellow}}returning funds to %s{{/}}\n", key.Address) - unitPrices, err = cli.UnitPrices(ctx, false) - if err != nil { - return err - } - feePerTx, err = fees.MulSum(unitPrices, maxUnits) - if err != nil { - return err - } - var returnedBalance uint64 - p = &pacer{ws: webSocketClient} - go p.Run(ctx, minTxsPerSecond) - for i := 0; i < numAccounts; i++ { - // Determine if we should return funds - balance := funds[accounts[i].Address] - if feePerTx > balance { - continue - } - - // Send funds - returnAmt := balance - feePerTx - actions := sh.GetTransfer(key.Address, returnAmt, uniqueBytes()) - _, tx, err := cli.GenerateTransactionManual(parser, actions, factories[i], feePerTx) - if err != nil { - return err - } - if err := p.Add(tx); err != nil { - return err - } - returnedBalance += returnAmt - - if i%250 == 0 && i > 0 { - utils.Outf("{{yellow}}checked %d accounts for fund return{{/}}\n", i) - } - } - if err := p.Wait(); err != nil { - utils.Outf("{{orange}}failed to return funds:{{/}} %v\n", err) - return err - } - utils.Outf( - "{{yellow}}returned funds:{{/}} %s %s\n", - utils.FormatBalance(returnedBalance), - h.c.Symbol(), - ) - return nil + return nil, err + } + + return throughput.NewSpammer( + uris, + key, + balance, + sZipf, + vZipf, + txsPerSecond, + minTxsPerSecond, + txsPerSecondStep, + numClients, + numAccounts, + ), nil } -type pacer struct { - ws *ws.WebSocketClient - - inflight chan struct{} - done chan error -} - -func (p *pacer) Run(ctx context.Context, max int) { - p.inflight = make(chan struct{}, max) - p.done = make(chan error) - - for range p.inflight { - _, wsErr, result, err := p.ws.ListenTx(ctx) - if err != nil { - p.done <- err - return - } - if wsErr != nil { - p.done <- wsErr - return - } - if !result.Success { - // Should never happen - p.done <- fmt.Errorf("%w: %s", ErrTxFailed, result.Error) - return - } - } - p.done <- nil -} - -func (p *pacer) Add(tx *chain.Transaction) error { - if err := p.ws.RegisterTx(tx); err != nil { - return err - } - select { - case p.inflight <- struct{}{}: - return nil - case err := <-p.done: - return err - } -} - -func (p *pacer) Wait() error { - close(p.inflight) - return <-p.done -} - -type issuer struct { - i int - uri string - parser chain.Parser - - // TODO: clean up potential race conditions here. - l sync.Mutex - cli *jsonrpc.JSONRPCClient - ws *ws.WebSocketClient - outstandingTxs int - abandoned error -} - -func (i *issuer) Start(ctx context.Context) { - issuerWg.Add(1) - go func() { - for { - _, wsErr, result, err := i.ws.ListenTx(context.TODO()) - if err != nil { - return - } - i.l.Lock() - i.outstandingTxs-- - i.l.Unlock() - inflight.Add(-1) - l.Lock() - if result != nil { - if result.Success { - confirmedTxs++ - } else { - utils.Outf("{{orange}}on-chain tx failure:{{/}} %s %t\n", string(result.Error), result.Success) - } - } else { - // We can't error match here because we receive it over the wire. - if !strings.Contains(wsErr.Error(), ws.ErrExpired.Error()) { - utils.Outf("{{orange}}pre-execute tx failure:{{/}} %v\n", wsErr) - } - } - totalTxs++ - l.Unlock() - } - }() - go func() { - defer func() { - _ = i.ws.Close() - issuerWg.Done() - }() - - <-ctx.Done() - start := time.Now() - for time.Since(start) < issuerShutdownTimeout { - if i.ws.Closed() { - return - } - i.l.Lock() - outstanding := i.outstandingTxs - i.l.Unlock() - if outstanding == 0 { - return - } - utils.Outf("{{orange}}waiting for issuer %d to finish:{{/}} %d\n", i.i, outstanding) - time.Sleep(time.Second) - } - utils.Outf("{{orange}}issuer %d shutdown timeout{{/}}\n", i.i) - }() -} - -func (i *issuer) Send(ctx context.Context, actions []chain.Action, factory chain.AuthFactory, feePerTx uint64) error { - // Construct transaction - _, tx, err := i.cli.GenerateTransactionManual(i.parser, actions, factory, feePerTx) +func (h *Handler) Spam(ctx context.Context, sh throughput.SpamHelper, defaults bool) error { + spammer, err := h.BuildSpammer(sh, defaults) if err != nil { - utils.Outf("{{orange}}failed to generate tx:{{/}} %v\n", err) - return fmt.Errorf("failed to generate tx: %w", err) - } - - // Increase outstanding txs for issuer - i.l.Lock() - i.outstandingTxs++ - i.l.Unlock() - inflight.Add(1) - - // Register transaction and recover upon failure - if err := i.ws.RegisterTx(tx); err != nil { - i.l.Lock() - if i.ws.Closed() { - if i.abandoned != nil { - i.l.Unlock() - return i.abandoned - } - - // Attempt to recreate issuer - utils.Outf("{{orange}}re-creating issuer:{{/}} %d {{orange}}uri:{{/}} %s\n", i.i, i.uri) - ws, err := ws.NewWebSocketClient(i.uri, ws.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) // we write the max read - if err != nil { - i.abandoned = err - utils.Outf("{{orange}}could not re-create closed issuer:{{/}} %v\n", err) - i.l.Unlock() - return err - } - i.ws = ws - i.l.Unlock() - - i.Start(ctx) - utils.Outf("{{green}}re-created closed issuer:{{/}} %d\n", i.i) - } - - // If issuance fails during retry, we should fail - return i.ws.RegisterTx(tx) + return err } - return nil -} - -func getRandomIssuer(issuers []*issuer) *issuer { - index := rand.Int() % len(issuers) - return issuers[index] -} -func uniqueBytes() []byte { - return binary.BigEndian.AppendUint64(nil, uint64(sent.Add(1))) + return spammer.Spam(ctx, sh, false, h.c.Symbol()) } diff --git a/cli/storage.go b/cli/storage.go index dba44bc60d..546b6c6d94 100644 --- a/cli/storage.go +++ b/cli/storage.go @@ -10,6 +10,7 @@ import ( "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/hypersdk/auth" "github.com/ava-labs/hypersdk/codec" "github.com/ava-labs/hypersdk/utils" ) @@ -67,7 +68,7 @@ func (h *Handler) GetDefaultChain(log bool) (ids.ID, []string, error) { return chainID, uris, nil } -func (h *Handler) StoreKey(priv *PrivateKey) error { +func (h *Handler) StoreKey(priv *auth.PrivateKey) error { k := make([]byte, 1+codec.AddressLen) k[0] = keyPrefix copy(k[1:], priv.Address[:]) @@ -96,20 +97,15 @@ func (h *Handler) GetKey(addr codec.Address) ([]byte, error) { return v, nil } -type PrivateKey struct { - Address codec.Address - Bytes []byte -} - -func (h *Handler) GetKeys() ([]*PrivateKey, error) { +func (h *Handler) GetKeys() ([]*auth.PrivateKey, error) { iter := h.db.NewIteratorWithPrefix([]byte{keyPrefix}) defer iter.Release() - privateKeys := []*PrivateKey{} + privateKeys := []*auth.PrivateKey{} for iter.Next() { // It is safe to use these bytes directly because the database copies the // iterator value for us. - privateKeys = append(privateKeys, &PrivateKey{ + privateKeys = append(privateKeys, &auth.PrivateKey{ Address: codec.Address(iter.Key()[1:]), Bytes: iter.Value(), }) diff --git a/examples/morpheusvm/auth/keys.go b/examples/morpheusvm/auth/keys.go new file mode 100644 index 0000000000..5ec5c47f97 --- /dev/null +++ b/examples/morpheusvm/auth/keys.go @@ -0,0 +1,108 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +// Package auth provides utilities for generating and loading private keys. +// This package is only used for testing and CLI purposes and is not required +// to be implemented by the VM developer. + +package auth + +import ( + "errors" + "fmt" + + "github.com/ava-labs/hypersdk/auth" + "github.com/ava-labs/hypersdk/crypto/bls" + "github.com/ava-labs/hypersdk/crypto/ed25519" + "github.com/ava-labs/hypersdk/crypto/secp256r1" + "github.com/ava-labs/hypersdk/utils" +) + +var ErrInvalidKeyType = errors.New("invalid key type") + +// TODO: make these functions general purpose where the VM provides a set of valid strings, +// functions to generate corresponding new private keys, and the functionality +// to unmarshal private key bytes into the correct type. +func CheckKeyType(k string) error { + switch k { + case auth.ED25519Key, auth.Secp256r1Key, auth.BLSKey: + return nil + default: + return fmt.Errorf("%w: %s", ErrInvalidKeyType, k) + } +} + +func GeneratePrivateKey(k string) (*auth.PrivateKey, error) { + switch k { + case auth.ED25519Key: + p, err := ed25519.GeneratePrivateKey() + if err != nil { + return nil, err + } + return &auth.PrivateKey{ + Address: auth.NewED25519Address(p.PublicKey()), + Bytes: p[:], + }, nil + case auth.Secp256r1Key: + p, err := secp256r1.GeneratePrivateKey() + if err != nil { + return nil, err + } + return &auth.PrivateKey{ + Address: auth.NewSECP256R1Address(p.PublicKey()), + Bytes: p[:], + }, nil + case auth.BLSKey: + p, err := bls.GeneratePrivateKey() + if err != nil { + return nil, err + } + return &auth.PrivateKey{ + Address: auth.NewBLSAddress(bls.PublicFromPrivateKey(p)), + Bytes: bls.PrivateKeyToBytes(p), + }, nil + default: + return nil, ErrInvalidKeyType + } +} + +func LoadPrivateKey(k string, path string) (*auth.PrivateKey, error) { + switch k { + case auth.ED25519Key: + p, err := utils.LoadBytes(path, ed25519.PrivateKeyLen) + if err != nil { + return nil, err + } + pk := ed25519.PrivateKey(p) + return &auth.PrivateKey{ + Address: auth.NewED25519Address(pk.PublicKey()), + Bytes: p, + }, nil + case auth.Secp256r1Key: + p, err := utils.LoadBytes(path, secp256r1.PrivateKeyLen) + if err != nil { + return nil, err + } + pk := secp256r1.PrivateKey(p) + return &auth.PrivateKey{ + Address: auth.NewSECP256R1Address(pk.PublicKey()), + Bytes: p, + }, nil + case auth.BLSKey: + p, err := utils.LoadBytes(path, bls.PrivateKeyLen) + if err != nil { + return nil, err + } + + privKey, err := bls.PrivateKeyFromBytes(p) + if err != nil { + return nil, err + } + return &auth.PrivateKey{ + Address: auth.NewBLSAddress(bls.PublicFromPrivateKey(privKey)), + Bytes: p, + }, nil + default: + return nil, ErrInvalidKeyType + } +} diff --git a/examples/morpheusvm/cmd/morpheus-cli/cmd/handler.go b/examples/morpheusvm/cmd/morpheus-cli/cmd/handler.go index d8f957c26f..f913572fe2 100644 --- a/examples/morpheusvm/cmd/morpheus-cli/cmd/handler.go +++ b/examples/morpheusvm/cmd/morpheus-cli/cmd/handler.go @@ -38,7 +38,7 @@ func (h *Handler) Root() *cli.Handler { } func (h *Handler) DefaultActor() ( - ids.ID, *cli.PrivateKey, chain.AuthFactory, + ids.ID, *auth.PrivateKey, chain.AuthFactory, *jsonrpc.JSONRPCClient, *vm.JSONRPCClient, *ws.WebSocketClient, error, ) { addr, priv, err := h.h.GetDefaultKey(true) @@ -73,7 +73,7 @@ func (h *Handler) DefaultActor() ( return ids.Empty, nil, nil, nil, nil, nil, err } // For [defaultActor], we always send requests to the first returned URI. - return chainID, &cli.PrivateKey{ + return chainID, &auth.PrivateKey{ Address: addr, Bytes: priv, }, factory, jcli, diff --git a/examples/morpheusvm/cmd/morpheus-cli/cmd/key.go b/examples/morpheusvm/cmd/morpheus-cli/cmd/key.go index 000c092b31..edb0e6014c 100644 --- a/examples/morpheusvm/cmd/morpheus-cli/cmd/key.go +++ b/examples/morpheusvm/cmd/morpheus-cli/cmd/key.go @@ -4,108 +4,12 @@ package cmd import ( - "fmt" - "github.com/spf13/cobra" - "github.com/ava-labs/hypersdk/auth" - "github.com/ava-labs/hypersdk/cli" - "github.com/ava-labs/hypersdk/crypto/bls" - "github.com/ava-labs/hypersdk/crypto/ed25519" - "github.com/ava-labs/hypersdk/crypto/secp256r1" + "github.com/ava-labs/hypersdk/examples/morpheusvm/auth" "github.com/ava-labs/hypersdk/utils" ) -const ( - ed25519Key = "ed25519" - secp256r1Key = "secp256r1" - blsKey = "bls" -) - -func checkKeyType(k string) error { - switch k { - case ed25519Key, secp256r1Key, blsKey: - return nil - default: - return fmt.Errorf("%w: %s", ErrInvalidKeyType, k) - } -} - -func generatePrivateKey(k string) (*cli.PrivateKey, error) { - switch k { - case ed25519Key: - p, err := ed25519.GeneratePrivateKey() - if err != nil { - return nil, err - } - return &cli.PrivateKey{ - Address: auth.NewED25519Address(p.PublicKey()), - Bytes: p[:], - }, nil - case secp256r1Key: - p, err := secp256r1.GeneratePrivateKey() - if err != nil { - return nil, err - } - return &cli.PrivateKey{ - Address: auth.NewSECP256R1Address(p.PublicKey()), - Bytes: p[:], - }, nil - case blsKey: - p, err := bls.GeneratePrivateKey() - if err != nil { - return nil, err - } - return &cli.PrivateKey{ - Address: auth.NewBLSAddress(bls.PublicFromPrivateKey(p)), - Bytes: bls.PrivateKeyToBytes(p), - }, nil - default: - return nil, ErrInvalidKeyType - } -} - -func loadPrivateKey(k string, path string) (*cli.PrivateKey, error) { - switch k { - case ed25519Key: - p, err := utils.LoadBytes(path, ed25519.PrivateKeyLen) - if err != nil { - return nil, err - } - pk := ed25519.PrivateKey(p) - return &cli.PrivateKey{ - Address: auth.NewED25519Address(pk.PublicKey()), - Bytes: p, - }, nil - case secp256r1Key: - p, err := utils.LoadBytes(path, secp256r1.PrivateKeyLen) - if err != nil { - return nil, err - } - pk := secp256r1.PrivateKey(p) - return &cli.PrivateKey{ - Address: auth.NewSECP256R1Address(pk.PublicKey()), - Bytes: p, - }, nil - case blsKey: - p, err := utils.LoadBytes(path, bls.PrivateKeyLen) - if err != nil { - return nil, err - } - - privKey, err := bls.PrivateKeyFromBytes(p) - if err != nil { - return nil, err - } - return &cli.PrivateKey{ - Address: auth.NewBLSAddress(bls.PublicFromPrivateKey(privKey)), - Bytes: p, - }, nil - default: - return nil, ErrInvalidKeyType - } -} - var keyCmd = &cobra.Command{ Use: "key", RunE: func(*cobra.Command, []string) error { @@ -119,10 +23,10 @@ var genKeyCmd = &cobra.Command{ if len(args) != 1 { return ErrInvalidArgs } - return checkKeyType(args[0]) + return auth.CheckKeyType(args[0]) }, RunE: func(_ *cobra.Command, args []string) error { - priv, err := generatePrivateKey(args[0]) + priv, err := auth.GeneratePrivateKey(args[0]) if err != nil { return err } @@ -146,10 +50,10 @@ var importKeyCmd = &cobra.Command{ if len(args) != 2 { return ErrInvalidArgs } - return checkKeyType(args[0]) + return auth.CheckKeyType(args[0]) }, RunE: func(_ *cobra.Command, args []string) error { - priv, err := loadPrivateKey(args[0], args[1]) + priv, err := auth.LoadPrivateKey(args[0], args[1]) if err != nil { return err } diff --git a/examples/morpheusvm/cmd/morpheus-cli/cmd/root.go b/examples/morpheusvm/cmd/morpheus-cli/cmd/root.go index 6f58834b86..b3187e39a6 100644 --- a/examples/morpheusvm/cmd/morpheus-cli/cmd/root.go +++ b/examples/morpheusvm/cmd/morpheus-cli/cmd/root.go @@ -30,6 +30,7 @@ var ( minBlockGap int64 hideTxs bool checkAllChains bool + spamDefaults bool prometheusBaseURI string prometheusOpenBrowser bool prometheusFile string @@ -142,6 +143,13 @@ func init() { transferCmd, ) + runSpamCmd.PersistentFlags().BoolVar( + &spamDefaults, + "defaults", + false, + "use default spam parameters", + ) + // spam spamCmd.AddCommand( runSpamCmd, diff --git a/examples/morpheusvm/cmd/morpheus-cli/cmd/spam.go b/examples/morpheusvm/cmd/morpheus-cli/cmd/spam.go index 289c45624b..f159879511 100644 --- a/examples/morpheusvm/cmd/morpheus-cli/cmd/spam.go +++ b/examples/morpheusvm/cmd/morpheus-cli/cmd/spam.go @@ -8,85 +8,10 @@ import ( "github.com/spf13/cobra" - "github.com/ava-labs/hypersdk/api/ws" - "github.com/ava-labs/hypersdk/auth" - "github.com/ava-labs/hypersdk/chain" - "github.com/ava-labs/hypersdk/cli" - "github.com/ava-labs/hypersdk/codec" - "github.com/ava-labs/hypersdk/crypto/bls" - "github.com/ava-labs/hypersdk/crypto/ed25519" - "github.com/ava-labs/hypersdk/crypto/secp256r1" - "github.com/ava-labs/hypersdk/examples/morpheusvm/actions" - "github.com/ava-labs/hypersdk/examples/morpheusvm/consts" - "github.com/ava-labs/hypersdk/examples/morpheusvm/vm" - "github.com/ava-labs/hypersdk/pubsub" - "github.com/ava-labs/hypersdk/utils" + "github.com/ava-labs/hypersdk/examples/morpheusvm/auth" + "github.com/ava-labs/hypersdk/examples/morpheusvm/throughput" ) -type SpamHelper struct { - keyType string - cli *vm.JSONRPCClient - ws *ws.WebSocketClient -} - -func (sh *SpamHelper) CreateAccount() (*cli.PrivateKey, error) { - return generatePrivateKey(sh.keyType) -} - -func (*SpamHelper) GetFactory(pk *cli.PrivateKey) (chain.AuthFactory, error) { - switch pk.Address[0] { - case auth.ED25519ID: - return auth.NewED25519Factory(ed25519.PrivateKey(pk.Bytes)), nil - case auth.SECP256R1ID: - return auth.NewSECP256R1Factory(secp256r1.PrivateKey(pk.Bytes)), nil - case auth.BLSID: - p, err := bls.PrivateKeyFromBytes(pk.Bytes) - if err != nil { - return nil, err - } - return auth.NewBLSFactory(p), nil - default: - return nil, ErrInvalidKeyType - } -} - -func (sh *SpamHelper) CreateClient(uri string) error { - sh.cli = vm.NewJSONRPCClient(uri) - ws, err := ws.NewWebSocketClient(uri, ws.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) - if err != nil { - return err - } - sh.ws = ws - return nil -} - -func (sh *SpamHelper) GetParser(ctx context.Context) (chain.Parser, error) { - return sh.cli.Parser(ctx) -} - -func (sh *SpamHelper) LookupBalance(choice int, address codec.Address) (uint64, error) { - balance, err := sh.cli.Balance(context.TODO(), address) - if err != nil { - return 0, err - } - utils.Outf( - "%d) {{cyan}}address:{{/}} %s {{cyan}}balance:{{/}} %s %s\n", - choice, - address, - utils.FormatBalance(balance), - consts.Symbol, - ) - return balance, err -} - -func (*SpamHelper) GetTransfer(address codec.Address, amount uint64, memo []byte) []chain.Action { - return []chain.Action{&actions.Transfer{ - To: address, - Value: amount, - Memo: memo, - }} -} - var spamCmd = &cobra.Command{ Use: "spam", RunE: func(*cobra.Command, []string) error { @@ -100,9 +25,10 @@ var runSpamCmd = &cobra.Command{ if len(args) != 1 { return ErrInvalidArgs } - return checkKeyType(args[0]) + return auth.CheckKeyType(args[0]) }, RunE: func(_ *cobra.Command, args []string) error { - return handler.Root().Spam(&SpamHelper{keyType: args[0]}) + ctx := context.Background() + return handler.Root().Spam(ctx, &throughput.SpamHelper{KeyType: args[0]}, spamDefaults) }, } diff --git a/examples/morpheusvm/tests/e2e/e2e_test.go b/examples/morpheusvm/tests/e2e/e2e_test.go index 8c0d85b327..ea9b491741 100644 --- a/examples/morpheusvm/tests/e2e/e2e_test.go +++ b/examples/morpheusvm/tests/e2e/e2e_test.go @@ -11,8 +11,10 @@ import ( "github.com/stretchr/testify/require" "github.com/ava-labs/hypersdk/abi" + "github.com/ava-labs/hypersdk/auth" "github.com/ava-labs/hypersdk/examples/morpheusvm/consts" "github.com/ava-labs/hypersdk/examples/morpheusvm/tests/workload" + "github.com/ava-labs/hypersdk/examples/morpheusvm/throughput" "github.com/ava-labs/hypersdk/examples/morpheusvm/vm" "github.com/ava-labs/hypersdk/tests/fixture" @@ -36,7 +38,7 @@ func init() { var _ = ginkgo.SynchronizedBeforeSuite(func() []byte { require := require.New(ginkgo.GinkgoT()) - gen, workloadFactory, err := workload.New(100 /* minBlockGap: 100ms */) + gen, workloadFactory, spamKey, err := workload.New(100 /* minBlockGap: 100ms */) require.NoError(err) genesisBytes, err := json.Marshal(gen) @@ -50,9 +52,12 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte { // Import HyperSDK e2e test coverage and inject MorpheusVM name // and workload factory to orchestrate the test. - he2e.SetWorkload(consts.Name, workloadFactory, parser, expectedABI) + spamHelper := throughput.SpamHelper{ + KeyType: auth.ED25519Key, + } tc := e2e.NewTestContext() + he2e.SetWorkload(consts.Name, workloadFactory, expectedABI, parser, &spamHelper, spamKey) return fixture.NewTestEnvironment(tc, flagVars, owner, consts.Name, consts.ID, genesisBytes).Marshal() }, func(envBytes []byte) { diff --git a/examples/morpheusvm/tests/integration/integration_test.go b/examples/morpheusvm/tests/integration/integration_test.go index ad65e27433..97b1a28312 100644 --- a/examples/morpheusvm/tests/integration/integration_test.go +++ b/examples/morpheusvm/tests/integration/integration_test.go @@ -25,7 +25,7 @@ func TestIntegration(t *testing.T) { var _ = ginkgo.BeforeSuite(func() { require := require.New(ginkgo.GinkgoT()) - genesis, workloadFactory, err := morpheusWorkload.New(0 /* minBlockGap: 0ms */) + genesis, workloadFactory, _, err := morpheusWorkload.New(0 /* minBlockGap: 0ms */) require.NoError(err) genesisBytes, err := json.Marshal(genesis) diff --git a/examples/morpheusvm/tests/workload/workload.go b/examples/morpheusvm/tests/workload/workload.go index 77a821ce5d..f529fa68bc 100644 --- a/examples/morpheusvm/tests/workload/workload.go +++ b/examples/morpheusvm/tests/workload/workload.go @@ -63,7 +63,7 @@ type workloadFactory struct { addrs []codec.Address } -func New(minBlockGap int64) (*genesis.DefaultGenesis, workload.TxWorkloadFactory, error) { +func New(minBlockGap int64) (*genesis.DefaultGenesis, workload.TxWorkloadFactory, *auth.PrivateKey, error) { customAllocs := make([]*genesis.CustomAllocation, 0, len(ed25519Addrs)) for _, prefundedAddr := range ed25519Addrs { customAllocs = append(customAllocs, &genesis.CustomAllocation{ @@ -71,7 +71,10 @@ func New(minBlockGap int64) (*genesis.DefaultGenesis, workload.TxWorkloadFactory Balance: initialBalance, }) } - + spamKey := &auth.PrivateKey{ + Address: ed25519Addrs[0], + Bytes: ed25519PrivKeys[0][:], + } genesis := genesis.NewDefaultGenesis(customAllocs) // Set WindowTargetUnits to MaxUint64 for all dimensions to iterate full mempool during block building. genesis.Rules.WindowTargetUnits = fees.Dimensions{math.MaxUint64, math.MaxUint64, math.MaxUint64, math.MaxUint64, math.MaxUint64} @@ -83,7 +86,7 @@ func New(minBlockGap int64) (*genesis.DefaultGenesis, workload.TxWorkloadFactory return genesis, &workloadFactory{ factories: ed25519AuthFactories, addrs: ed25519Addrs, - }, nil + }, spamKey, nil } func (f *workloadFactory) NewSizedTxWorkload(uri string, size int) (workload.TxWorkloadIterator, error) { diff --git a/examples/morpheusvm/throughput/helper.go b/examples/morpheusvm/throughput/helper.go new file mode 100644 index 0000000000..33ab2f95a9 --- /dev/null +++ b/examples/morpheusvm/throughput/helper.go @@ -0,0 +1,65 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +// Package throughput implements the SpamHelper interface. This package is not +// required to be implemented by the VM developer. + +package throughput + +import ( + "context" + + "github.com/ava-labs/hypersdk/api/ws" + "github.com/ava-labs/hypersdk/auth" + "github.com/ava-labs/hypersdk/chain" + "github.com/ava-labs/hypersdk/codec" + "github.com/ava-labs/hypersdk/examples/morpheusvm/actions" + "github.com/ava-labs/hypersdk/examples/morpheusvm/vm" + "github.com/ava-labs/hypersdk/pubsub" + "github.com/ava-labs/hypersdk/throughput" + + mauth "github.com/ava-labs/hypersdk/examples/morpheusvm/auth" +) + +type SpamHelper struct { + KeyType string + cli *vm.JSONRPCClient + ws *ws.WebSocketClient +} + +var _ throughput.SpamHelper = &SpamHelper{} + +func (sh *SpamHelper) CreateAccount() (*auth.PrivateKey, error) { + return mauth.GeneratePrivateKey(sh.KeyType) +} + +func (sh *SpamHelper) CreateClient(uri string) error { + sh.cli = vm.NewJSONRPCClient(uri) + ws, err := ws.NewWebSocketClient(uri, ws.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) + if err != nil { + return err + } + sh.ws = ws + return nil +} + +func (sh *SpamHelper) GetParser(ctx context.Context) (chain.Parser, error) { + return sh.cli.Parser(ctx) +} + +func (sh *SpamHelper) LookupBalance(address codec.Address) (uint64, error) { + balance, err := sh.cli.Balance(context.TODO(), address) + if err != nil { + return 0, err + } + + return balance, err +} + +func (*SpamHelper) GetTransfer(address codec.Address, amount uint64, memo []byte) []chain.Action { + return []chain.Action{&actions.Transfer{ + To: address, + Value: amount, + Memo: memo, + }} +} diff --git a/examples/vmwithcontracts/cmd/vmwithcontracts-cli/cmd/handler.go b/examples/vmwithcontracts/cmd/vmwithcontracts-cli/cmd/handler.go index 142ec89b00..2d31491c15 100644 --- a/examples/vmwithcontracts/cmd/vmwithcontracts-cli/cmd/handler.go +++ b/examples/vmwithcontracts/cmd/vmwithcontracts-cli/cmd/handler.go @@ -38,7 +38,7 @@ func (h *Handler) Root() *cli.Handler { } func (h *Handler) DefaultActor() ( - ids.ID, *cli.PrivateKey, chain.AuthFactory, + ids.ID, *auth.PrivateKey, chain.AuthFactory, *jsonrpc.JSONRPCClient, *vm.JSONRPCClient, *ws.WebSocketClient, error, ) { addr, priv, err := h.h.GetDefaultKey(true) @@ -73,7 +73,7 @@ func (h *Handler) DefaultActor() ( return ids.Empty, nil, nil, nil, nil, nil, err } // For [defaultActor], we always send requests to the first returned URI. - return chainID, &cli.PrivateKey{ + return chainID, &auth.PrivateKey{ Address: addr, Bytes: priv, }, factory, jcli, diff --git a/examples/vmwithcontracts/cmd/vmwithcontracts-cli/cmd/key.go b/examples/vmwithcontracts/cmd/vmwithcontracts-cli/cmd/key.go index 000c092b31..950d0cb166 100644 --- a/examples/vmwithcontracts/cmd/vmwithcontracts-cli/cmd/key.go +++ b/examples/vmwithcontracts/cmd/vmwithcontracts-cli/cmd/key.go @@ -9,7 +9,6 @@ import ( "github.com/spf13/cobra" "github.com/ava-labs/hypersdk/auth" - "github.com/ava-labs/hypersdk/cli" "github.com/ava-labs/hypersdk/crypto/bls" "github.com/ava-labs/hypersdk/crypto/ed25519" "github.com/ava-labs/hypersdk/crypto/secp256r1" @@ -31,14 +30,14 @@ func checkKeyType(k string) error { } } -func generatePrivateKey(k string) (*cli.PrivateKey, error) { +func generatePrivateKey(k string) (*auth.PrivateKey, error) { switch k { case ed25519Key: p, err := ed25519.GeneratePrivateKey() if err != nil { return nil, err } - return &cli.PrivateKey{ + return &auth.PrivateKey{ Address: auth.NewED25519Address(p.PublicKey()), Bytes: p[:], }, nil @@ -47,7 +46,7 @@ func generatePrivateKey(k string) (*cli.PrivateKey, error) { if err != nil { return nil, err } - return &cli.PrivateKey{ + return &auth.PrivateKey{ Address: auth.NewSECP256R1Address(p.PublicKey()), Bytes: p[:], }, nil @@ -56,7 +55,7 @@ func generatePrivateKey(k string) (*cli.PrivateKey, error) { if err != nil { return nil, err } - return &cli.PrivateKey{ + return &auth.PrivateKey{ Address: auth.NewBLSAddress(bls.PublicFromPrivateKey(p)), Bytes: bls.PrivateKeyToBytes(p), }, nil @@ -65,7 +64,7 @@ func generatePrivateKey(k string) (*cli.PrivateKey, error) { } } -func loadPrivateKey(k string, path string) (*cli.PrivateKey, error) { +func loadPrivateKey(k string, path string) (*auth.PrivateKey, error) { switch k { case ed25519Key: p, err := utils.LoadBytes(path, ed25519.PrivateKeyLen) @@ -73,7 +72,7 @@ func loadPrivateKey(k string, path string) (*cli.PrivateKey, error) { return nil, err } pk := ed25519.PrivateKey(p) - return &cli.PrivateKey{ + return &auth.PrivateKey{ Address: auth.NewED25519Address(pk.PublicKey()), Bytes: p, }, nil @@ -83,7 +82,7 @@ func loadPrivateKey(k string, path string) (*cli.PrivateKey, error) { return nil, err } pk := secp256r1.PrivateKey(p) - return &cli.PrivateKey{ + return &auth.PrivateKey{ Address: auth.NewSECP256R1Address(pk.PublicKey()), Bytes: p, }, nil @@ -97,7 +96,7 @@ func loadPrivateKey(k string, path string) (*cli.PrivateKey, error) { if err != nil { return nil, err } - return &cli.PrivateKey{ + return &auth.PrivateKey{ Address: auth.NewBLSAddress(bls.PublicFromPrivateKey(privKey)), Bytes: p, }, nil diff --git a/examples/vmwithcontracts/cmd/vmwithcontracts-cli/cmd/spam.go b/examples/vmwithcontracts/cmd/vmwithcontracts-cli/cmd/spam.go index 4e6e53b8e2..f13783ef36 100644 --- a/examples/vmwithcontracts/cmd/vmwithcontracts-cli/cmd/spam.go +++ b/examples/vmwithcontracts/cmd/vmwithcontracts-cli/cmd/spam.go @@ -11,16 +11,10 @@ import ( "github.com/ava-labs/hypersdk/api/ws" "github.com/ava-labs/hypersdk/auth" "github.com/ava-labs/hypersdk/chain" - "github.com/ava-labs/hypersdk/cli" "github.com/ava-labs/hypersdk/codec" - "github.com/ava-labs/hypersdk/crypto/bls" - "github.com/ava-labs/hypersdk/crypto/ed25519" - "github.com/ava-labs/hypersdk/crypto/secp256r1" "github.com/ava-labs/hypersdk/examples/vmwithcontracts/actions" - "github.com/ava-labs/hypersdk/examples/vmwithcontracts/consts" "github.com/ava-labs/hypersdk/examples/vmwithcontracts/vm" "github.com/ava-labs/hypersdk/pubsub" - "github.com/ava-labs/hypersdk/utils" ) type SpamHelper struct { @@ -29,27 +23,10 @@ type SpamHelper struct { ws *ws.WebSocketClient } -func (sh *SpamHelper) CreateAccount() (*cli.PrivateKey, error) { +func (sh *SpamHelper) CreateAccount() (*auth.PrivateKey, error) { return generatePrivateKey(sh.keyType) } -func (*SpamHelper) GetFactory(pk *cli.PrivateKey) (chain.AuthFactory, error) { - switch pk.Address[0] { - case auth.ED25519ID: - return auth.NewED25519Factory(ed25519.PrivateKey(pk.Bytes)), nil - case auth.SECP256R1ID: - return auth.NewSECP256R1Factory(secp256r1.PrivateKey(pk.Bytes)), nil - case auth.BLSID: - p, err := bls.PrivateKeyFromBytes(pk.Bytes) - if err != nil { - return nil, err - } - return auth.NewBLSFactory(p), nil - default: - return nil, ErrInvalidKeyType - } -} - func (sh *SpamHelper) CreateClient(uri string) error { sh.cli = vm.NewJSONRPCClient(uri) ws, err := ws.NewWebSocketClient(uri, ws.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) @@ -64,18 +41,12 @@ func (sh *SpamHelper) GetParser(ctx context.Context) (chain.Parser, error) { return sh.cli.Parser(ctx) } -func (sh *SpamHelper) LookupBalance(choice int, address codec.Address) (uint64, error) { +func (sh *SpamHelper) LookupBalance(address codec.Address) (uint64, error) { balance, err := sh.cli.Balance(context.TODO(), address) if err != nil { return 0, err } - utils.Outf( - "%d) {{cyan}}address:{{/}} %s {{cyan}}balance:{{/}} %s %s\n", - choice, - address, - utils.FormatBalance(balance), - consts.Symbol, - ) + return balance, err } @@ -103,6 +74,7 @@ var runSpamCmd = &cobra.Command{ return checkKeyType(args[0]) }, RunE: func(_ *cobra.Command, args []string) error { - return handler.Root().Spam(&SpamHelper{keyType: args[0]}) + ctx := context.Background() + return handler.Root().Spam(ctx, &SpamHelper{keyType: args[0]}, false) }, } diff --git a/examples/vmwithcontracts/tests/e2e/e2e_test.go b/examples/vmwithcontracts/tests/e2e/e2e_test.go index 6cfcbb19d7..e61cb30a35 100644 --- a/examples/vmwithcontracts/tests/e2e/e2e_test.go +++ b/examples/vmwithcontracts/tests/e2e/e2e_test.go @@ -50,7 +50,7 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte { // Import HyperSDK e2e test coverage and inject VMWithContracts name // and workload factory to orchestrate the test. - he2e.SetWorkload(consts.Name, workloadFactory, parser, expectedABI) + he2e.SetWorkload(consts.Name, workloadFactory, expectedABI, parser, nil, nil) tc := e2e.NewTestContext() diff --git a/tests/e2e/e2e.go b/tests/e2e/e2e.go index 085b4cb5e5..f1d8e4e0d6 100644 --- a/tests/e2e/e2e.go +++ b/tests/e2e/e2e.go @@ -18,8 +18,10 @@ import ( "github.com/ava-labs/hypersdk/abi" "github.com/ava-labs/hypersdk/api/jsonrpc" "github.com/ava-labs/hypersdk/api/state" + "github.com/ava-labs/hypersdk/auth" "github.com/ava-labs/hypersdk/chain" "github.com/ava-labs/hypersdk/tests/workload" + "github.com/ava-labs/hypersdk/throughput" "github.com/ava-labs/hypersdk/utils" ginkgo "github.com/onsi/ginkgo/v2" @@ -30,13 +32,17 @@ var ( txWorkloadFactory workload.TxWorkloadFactory parser chain.Parser expectedABI abi.ABI + spamKey *auth.PrivateKey + spamHelper throughput.SpamHelper ) -func SetWorkload(name string, factory workload.TxWorkloadFactory, chainParser chain.Parser, abi abi.ABI) { +func SetWorkload(name string, factory workload.TxWorkloadFactory, abi abi.ABI, chainParser chain.Parser, sh throughput.SpamHelper, key *auth.PrivateKey) { vmName = name txWorkloadFactory = factory parser = chainParser expectedABI = abi + spamHelper = sh + spamKey = key } var _ = ginkgo.Describe("[HyperSDK APIs]", func() { @@ -109,6 +115,39 @@ var _ = ginkgo.Describe("[HyperSDK Tx Workloads]", func() { }) }) +var _ = ginkgo.Describe("[HyperSDK Spam Workloads]", func() { + ginkgo.It("Spam Workload", func() { + if spamKey == nil || spamHelper == nil { + return + } + + tc := e2e.NewTestContext() + require := require.New(tc) + blockchainID := e2e.GetEnv(tc).GetNetwork().GetSubnet(vmName).Chains[0].ChainID + + // Spam Args + uris := getE2EURIs(tc, blockchainID) + key := spamKey + sZipf := 1.01 + vZipf := 2.7 + txsPerSecond := 500 + minTxsPerSecond := 100 + txsPerSecondStep := 200 + numClients := 10 + numAccounts := 25 + + // run spammer + err := spamHelper.CreateClient(uris[0]) + require.NoError(err) + balance, err := spamHelper.LookupBalance(key.Address) + require.NoError(err) + + spammer := throughput.NewSpammer(uris, key, balance, sZipf, vZipf, txsPerSecond, minTxsPerSecond, txsPerSecondStep, numClients, numAccounts) + err = spammer.Spam(tc.DefaultContext(), spamHelper, true, "AVAX") + require.NoError(err) + }) +}) + var _ = ginkgo.Describe("[HyperSDK Syncing]", func() { ginkgo.It("[Sync]", func() { tc := e2e.NewTestContext() diff --git a/throughput/errors.go b/throughput/errors.go new file mode 100644 index 0000000000..8c210cbd8d --- /dev/null +++ b/throughput/errors.go @@ -0,0 +1,8 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package throughput + +import "errors" + +var ErrTxFailed = errors.New("tx failed on-chain") diff --git a/throughput/helper.go b/throughput/helper.go new file mode 100644 index 0000000000..b616ba6310 --- /dev/null +++ b/throughput/helper.go @@ -0,0 +1,40 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package throughput + +import ( + "context" + + "github.com/ava-labs/hypersdk/auth" + "github.com/ava-labs/hypersdk/chain" + "github.com/ava-labs/hypersdk/codec" +) + +type SpamHelper interface { + // CreateAccount generates a new account and returns the [PrivateKey]. + // + // The spammer tracks all created accounts and orchestrates the return of funds + // sent to any created accounts on shutdown. If the spammer exits ungracefully, + // any funds sent to created accounts will be lost unless they are persisted by + // the [SpamHelper] implementation. + CreateAccount() (*auth.PrivateKey, error) + + // CreateClient instructs the [SpamHelper] to create and persist a VM-specific + // JSONRPC client. + // + // This client is used to retrieve the [chain.Parser] and the balance + // of arbitrary addresses. + // + // TODO: consider making these functions part of the required JSONRPC + // interface for the HyperSDK. + CreateClient(uri string) error + GetParser(ctx context.Context) (chain.Parser, error) + LookupBalance(address codec.Address) (uint64, error) + + // GetTransfer returns a list of actions that sends [amount] to a given [address]. + // + // Memo is used to ensure that each transaction is unique (even if between the same + // sender and receiver for the same amount). + GetTransfer(address codec.Address, amount uint64, memo []byte) []chain.Action +} diff --git a/throughput/issuer.go b/throughput/issuer.go new file mode 100644 index 0000000000..f74f2e3209 --- /dev/null +++ b/throughput/issuer.go @@ -0,0 +1,171 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package throughput + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "golang.org/x/exp/rand" + + "github.com/ava-labs/hypersdk/api/jsonrpc" + "github.com/ava-labs/hypersdk/api/ws" + "github.com/ava-labs/hypersdk/chain" + "github.com/ava-labs/hypersdk/pubsub" + "github.com/ava-labs/hypersdk/utils" +) + +type issuer struct { + i int + uri string + parser chain.Parser + + // TODO: clean up potential race conditions here. + l sync.Mutex + cli *jsonrpc.JSONRPCClient + ws *ws.WebSocketClient + outstandingTxs int + abandoned error +} + +func (i *issuer) Start(ctx context.Context) { + issuerWg.Add(1) + go func() { + for { + _, wsErr, result, err := i.ws.ListenTx(context.TODO()) + if err != nil { + return + } + i.l.Lock() + i.outstandingTxs-- + i.l.Unlock() + inflight.Add(-1) + l.Lock() + if result != nil { + if result.Success { + confirmedTxs++ + } else { + utils.Outf("{{orange}}on-chain tx failure:{{/}} %s %t\n", string(result.Error), result.Success) + } + } else { + // We can't error match here because we receive it over the wire. + if !strings.Contains(wsErr.Error(), ws.ErrExpired.Error()) { + utils.Outf("{{orange}}pre-execute tx failure:{{/}} %v\n", wsErr) + } + } + totalTxs++ + l.Unlock() + } + }() + go func() { + defer func() { + _ = i.ws.Close() + issuerWg.Done() + }() + + <-ctx.Done() + start := time.Now() + for time.Since(start) < issuerShutdownTimeout { + if i.ws.Closed() { + return + } + i.l.Lock() + outstanding := i.outstandingTxs + i.l.Unlock() + if outstanding == 0 { + return + } + utils.Outf("{{orange}}waiting for issuer %d to finish:{{/}} %d\n", i.i, outstanding) + time.Sleep(time.Second) + } + utils.Outf("{{orange}}issuer %d shutdown timeout{{/}}\n", i.i) + }() +} + +func (i *issuer) Send(ctx context.Context, actions []chain.Action, factory chain.AuthFactory, feePerTx uint64) error { + // Construct transaction + _, tx, err := i.cli.GenerateTransactionManual(i.parser, actions, factory, feePerTx) + if err != nil { + utils.Outf("{{orange}}failed to generate tx:{{/}} %v\n", err) + return fmt.Errorf("failed to generate tx: %w", err) + } + + // Increase outstanding txs for issuer + i.l.Lock() + i.outstandingTxs++ + i.l.Unlock() + inflight.Add(1) + + // Register transaction and recover upon failure + if err := i.ws.RegisterTx(tx); err != nil { + i.l.Lock() + if i.ws.Closed() { + if i.abandoned != nil { + i.l.Unlock() + return i.abandoned + } + + // Attempt to recreate issuer + utils.Outf("{{orange}}re-creating issuer:{{/}} %d {{orange}}uri:{{/}} %s\n", i.i, i.uri) + ws, err := ws.NewWebSocketClient(i.uri, ws.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) // we write the max read + if err != nil { + i.abandoned = err + utils.Outf("{{orange}}could not re-create closed issuer:{{/}} %v\n", err) + i.l.Unlock() + return err + } + i.ws = ws + i.l.Unlock() + + i.Start(ctx) + utils.Outf("{{green}}re-created closed issuer:{{/}} %d\n", i.i) + } + + // If issuance fails during retry, we should fail + return i.ws.RegisterTx(tx) + } + return nil +} + +func getRandomIssuer(issuers []*issuer) *issuer { + index := rand.Int() % len(issuers) + return issuers[index] +} + +func (i *issuer) logStats(cctx context.Context) { + // Log stats + t := time.NewTicker(1 * time.Second) // ensure no duplicates created + var psent int64 + go func() { + defer t.Stop() + for { + select { + case <-t.C: + current := sent.Load() + l.Lock() + if totalTxs > 0 { + unitPrices, err := i.cli.UnitPrices(cctx, false) + if err != nil { + continue + } + utils.Outf( + "{{yellow}}txs seen:{{/}} %d {{yellow}}success rate:{{/}} %.2f%% {{yellow}}inflight:{{/}} %d {{yellow}}issued/s:{{/}} %d {{yellow}}unit prices:{{/}} [%s]\n", //nolint:lll + totalTxs, + float64(confirmedTxs)/float64(totalTxs)*100, + inflight.Load(), + current-psent, + unitPrices, + ) + } + l.Unlock() + psent = current + case <-cctx.Done(): + return + } + } + }() +} diff --git a/throughput/pacer.go b/throughput/pacer.go new file mode 100644 index 0000000000..ed199518f0 --- /dev/null +++ b/throughput/pacer.go @@ -0,0 +1,59 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package throughput + +import ( + "context" + "fmt" + + "github.com/ava-labs/hypersdk/api/ws" + "github.com/ava-labs/hypersdk/chain" +) + +type pacer struct { + ws *ws.WebSocketClient + + inflight chan struct{} + done chan error +} + +func (p *pacer) Run(ctx context.Context, max int) { + p.inflight = make(chan struct{}, max) + p.done = make(chan error) + + for range p.inflight { + _, wsErr, result, err := p.ws.ListenTx(ctx) + if err != nil { + p.done <- err + return + } + if wsErr != nil { + p.done <- wsErr + return + } + if !result.Success { + // Should never happen + p.done <- fmt.Errorf("%w: %s", ErrTxFailed, result.Error) + return + } + } + p.done <- nil +} + +func (p *pacer) Add(tx *chain.Transaction) error { + if err := p.ws.RegisterTx(tx); err != nil { + return err + } + select { + case p.inflight <- struct{}{}: + return nil + case err := <-p.done: + return err + } +} + +func (p *pacer) Wait() error { + close(p.inflight) + return <-p.done +} diff --git a/throughput/spam.go b/throughput/spam.go new file mode 100644 index 0000000000..b15b42b601 --- /dev/null +++ b/throughput/spam.go @@ -0,0 +1,426 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package throughput + +import ( + "context" + "fmt" + "math/rand" + "os" + "os/signal" + "runtime" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/ava-labs/avalanchego/utils/set" + "golang.org/x/sync/errgroup" + + "github.com/ava-labs/hypersdk/api/jsonrpc" + "github.com/ava-labs/hypersdk/api/ws" + "github.com/ava-labs/hypersdk/auth" + "github.com/ava-labs/hypersdk/chain" + "github.com/ava-labs/hypersdk/codec" + "github.com/ava-labs/hypersdk/consts" + "github.com/ava-labs/hypersdk/fees" + "github.com/ava-labs/hypersdk/pubsub" + "github.com/ava-labs/hypersdk/utils" +) + +const ( + amountToTransfer = 1 + pendingTargetMultiplier = 10 + successfulRunsToIncreaseTarget = 10 + failedRunsToDecreaseTarget = 5 + + issuerShutdownTimeout = 60 * time.Second +) + +// TODO: remove the use of global variables +var ( + maxConcurrency = runtime.NumCPU() + issuerWg sync.WaitGroup + + l sync.Mutex + confirmedTxs uint64 + totalTxs uint64 + + inflight atomic.Int64 + sent atomic.Int64 +) + +type Spammer struct { + uris []string + key *auth.PrivateKey + balance uint64 + + // Zipf distribution parameters + zipfSeed *rand.Rand + sZipf float64 + vZipf float64 + + // TPS parameters + txsPerSecond int + minTxsPerSecond int + txsPerSecondStep int + numClients int // Number of clients per uri node + + // Number of accounts + numAccounts int +} + +func NewSpammer( + uris []string, + key *auth.PrivateKey, + balance uint64, + sZipf, vZipf float64, + txsPerSecond, minTxsPerSecond, txsPerSecondStep, numClients, numAccounts int, +) *Spammer { + // Log Zipf participants + zipfSeed := rand.New(rand.NewSource(0)) //nolint:gosec + + return &Spammer{ + uris, + key, + balance, + zipfSeed, + sZipf, + vZipf, + txsPerSecond, + minTxsPerSecond, + txsPerSecondStep, + numClients, + numAccounts, + } +} + +// Spam tests the throughput of the network by sending transactions using +// multiple accounts and clients. It first distributes funds to the accounts +// and then sends transactions between the accounts. It returns the funds to +// the original account after the test is complete. +// [sh] injects the necessary functions to interact with the network. +// [terminate] if true, the spammer will stop after reaching the target TPS. +// [symbol] and [decimals] are used to format the output. +func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbol string) error { + // log distribution + s.logZipf(s.zipfSeed) + + // new JSONRPC client + cli := jsonrpc.NewJSONRPCClient(s.uris[0]) + + factory, err := auth.GetFactory(s.key) + if err != nil { + return err + } + + // Compute max units + parser, err := sh.GetParser(ctx) + if err != nil { + return err + } + actions := sh.GetTransfer(s.key.Address, 0, uniqueBytes()) + maxUnits, err := chain.EstimateUnits(parser.Rules(time.Now().UnixMilli()), actions, factory) + if err != nil { + return err + } + + unitPrices, err := cli.UnitPrices(ctx, false) + if err != nil { + return err + } + feePerTx, err := fees.MulSum(unitPrices, maxUnits) + if err != nil { + return err + } + + var fundsL sync.Mutex + // distribute funds + accounts, funds, factories, err := s.distributeFunds(ctx, cli, parser, feePerTx, sh) + if err != nil { + return err + } + + // create issuers + issuers, err := s.createIssuers(parser) + if err != nil { + return err + } + + // make sure we can exit gracefully & return funds + signals := make(chan os.Signal, 2) + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) + + cctx, cancel := context.WithCancel(ctx) + defer cancel() + for _, issuer := range issuers { + issuer.Start(cctx) + } + + // set logging + issuers[0].logStats(cctx) + + // Broadcast txs + var ( + // Do not call this function concurrently (math.Rand is not safe for concurrent use) + z = rand.NewZipf(s.zipfSeed, s.sZipf, s.vZipf, uint64(s.numAccounts)-1) + + it = time.NewTimer(0) + currentTarget = min(s.txsPerSecond, s.minTxsPerSecond) + consecutiveUnderBacklog int + consecutiveAboveBacklog int + + stop bool + ) + utils.Outf("{{cyan}}initial target tps:{{/}} %d\n", currentTarget) + for !stop { + select { + case <-it.C: + start := time.Now() + + // Check to see if we should wait for pending txs + if int64(currentTarget)+inflight.Load() > int64(currentTarget*pendingTargetMultiplier) { + consecutiveUnderBacklog = 0 + consecutiveAboveBacklog++ + if consecutiveAboveBacklog >= failedRunsToDecreaseTarget { + if currentTarget > s.txsPerSecondStep { + currentTarget -= s.txsPerSecondStep + utils.Outf("{{cyan}}skipping issuance because large backlog detected, decreasing target tps:{{/}} %d\n", currentTarget) + } else { + utils.Outf("{{cyan}}skipping issuance because large backlog detected, cannot decrease target{{/}}\n") + } + consecutiveAboveBacklog = 0 + } + it.Reset(1 * time.Second) + break + } + + // Issue txs + g := &errgroup.Group{} + g.SetLimit(maxConcurrency) + for i := 0; i < currentTarget; i++ { + senderIndex, recipientIndex := z.Uint64(), z.Uint64() + sender := accounts[senderIndex] + if recipientIndex == senderIndex { + if recipientIndex == uint64(s.numAccounts-1) { + recipientIndex-- + } else { + recipientIndex++ + } + } + recipient := accounts[recipientIndex].Address + issuer := getRandomIssuer(issuers) + g.Go(func() error { + factory := factories[senderIndex] + fundsL.Lock() + balance := funds[sender.Address] + if feePerTx > balance { + fundsL.Unlock() + utils.Outf("{{orange}}tx has insufficient funds:{{/}} %s\n", sender.Address) + return fmt.Errorf("%s has insufficient funds", sender.Address) + } + funds[sender.Address] = balance - feePerTx - amountToTransfer + funds[recipient] += amountToTransfer + fundsL.Unlock() + + // Send transaction + actions := sh.GetTransfer(recipient, amountToTransfer, uniqueBytes()) + return issuer.Send(cctx, actions, factory, feePerTx) + }) + } + + // Wait for txs to finish + if err := g.Wait(); err != nil { + // We don't return here because we want to return funds + utils.Outf("{{orange}}broadcast loop error:{{/}} %v\n", err) + stop = true + break + } + + // Determine how long to sleep + dur := time.Since(start) + sleep := max(float64(consts.MillisecondsPerSecond-dur.Milliseconds()), 0) + it.Reset(time.Duration(sleep) * time.Millisecond) + + // Check to see if we should increase target + consecutiveAboveBacklog = 0 + consecutiveUnderBacklog++ + // once desired TPS is reached, stop the spammer + if terminate && currentTarget == s.txsPerSecond && consecutiveUnderBacklog >= successfulRunsToIncreaseTarget { + utils.Outf("{{green}}reached target tps:{{/}} %d\n", currentTarget) + // Cancel the context to stop the issuers + cancel() + } else if consecutiveUnderBacklog >= successfulRunsToIncreaseTarget && currentTarget < s.txsPerSecond { + currentTarget = min(currentTarget+s.txsPerSecondStep, s.txsPerSecond) + utils.Outf("{{cyan}}increasing target tps:{{/}} %d\n", currentTarget) + consecutiveUnderBacklog = 0 + } + case <-cctx.Done(): + stop = true + utils.Outf("{{yellow}}context canceled{{/}}\n") + case <-signals: + stop = true + utils.Outf("{{yellow}}exiting broadcast loop{{/}}\n") + cancel() + } + } + // Wait for all issuers to finish + utils.Outf("{{yellow}}waiting for issuers to return{{/}}\n") + issuerWg.Wait() + maxUnits, err = chain.EstimateUnits(parser.Rules(time.Now().UnixMilli()), actions, factory) + if err != nil { + return err + } + return s.returnFunds(ctx, cli, parser, maxUnits, sh, accounts, factories, funds, symbol) +} + +func (s *Spammer) logZipf(zipfSeed *rand.Rand) { + zz := rand.NewZipf(zipfSeed, s.sZipf, s.vZipf, uint64(s.numAccounts)-1) + trials := s.txsPerSecond * 60 * 2 // sender/receiver + unique := set.NewSet[uint64](trials) + for i := 0; i < trials; i++ { + unique.Add(zz.Uint64()) + } + utils.Outf("{{blue}}unique participants expected every 60s:{{/}} %d\n", unique.Len()) +} + +// createIssuer creates an [numClients] transaction issuers for each URI in [uris] +func (s *Spammer) createIssuers(parser chain.Parser) ([]*issuer, error) { + issuers := []*issuer{} + for i := 0; i < len(s.uris); i++ { + for j := 0; j < s.numClients; j++ { + cli := jsonrpc.NewJSONRPCClient(s.uris[i]) + webSocketClient, err := ws.NewWebSocketClient(s.uris[i], ws.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) // we write the max read + if err != nil { + return nil, err + } + issuer := &issuer{i: len(issuers), cli: cli, ws: webSocketClient, parser: parser, uri: s.uris[i]} + issuers = append(issuers, issuer) + } + } + return issuers, nil +} + +func (s *Spammer) distributeFunds(ctx context.Context, cli *jsonrpc.JSONRPCClient, parser chain.Parser, feePerTx uint64, sh SpamHelper) ([]*auth.PrivateKey, map[codec.Address]uint64, []chain.AuthFactory, error) { + withholding := feePerTx * uint64(s.numAccounts) + if s.balance < withholding { + return nil, nil, nil, fmt.Errorf("insufficient funds (have=%d need=%d)", s.balance, withholding) + } + + distAmount := (s.balance - withholding) / uint64(s.numAccounts) + + utils.Outf("{{yellow}}distributing funds to each account{{/}}\n") + + funds := map[codec.Address]uint64{} + accounts := make([]*auth.PrivateKey, s.numAccounts) + factories := make([]chain.AuthFactory, s.numAccounts) + + factory, err := auth.GetFactory(s.key) + if err != nil { + return nil, nil, nil, err + } + + webSocketClient, err := ws.NewWebSocketClient(s.uris[0], ws.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) // we write the max read + if err != nil { + return nil, nil, nil, err + } + p := &pacer{ws: webSocketClient} + go p.Run(ctx, s.minTxsPerSecond) + // TODO: we sleep here because occasionally the pacer will hang. Potentially due to + // p.wait() closing the inflight channel before the tx is registered/sent. Debug more. + time.Sleep(3 * time.Second) + for i := 0; i < s.numAccounts; i++ { + // Create account + pk, err := sh.CreateAccount() + if err != nil { + return nil, nil, nil, err + } + accounts[i] = pk + f, err := auth.GetFactory(pk) + if err != nil { + return nil, nil, nil, err + } + factories[i] = f + + // Send funds + actions := sh.GetTransfer(pk.Address, distAmount, uniqueBytes()) + _, tx, err := cli.GenerateTransactionManual(parser, actions, factory, feePerTx) + if err != nil { + return nil, nil, nil, err + } + if err := p.Add(tx); err != nil { + return nil, nil, nil, fmt.Errorf("%w: failed to register tx", err) + } + funds[pk.Address] = distAmount + + // Log progress + if i%250 == 0 && i > 0 { + utils.Outf("{{yellow}}issued transfer to %d accounts{{/}}\n", i) + } + } + if err := p.Wait(); err != nil { + return nil, nil, nil, err + } + utils.Outf("{{yellow}}distributed funds to %d accounts{{/}}\n", s.numAccounts) + + return accounts, funds, factories, nil +} + +func (s *Spammer) returnFunds(ctx context.Context, cli *jsonrpc.JSONRPCClient, parser chain.Parser, maxUnits fees.Dimensions, sh SpamHelper, accounts []*auth.PrivateKey, factories []chain.AuthFactory, funds map[codec.Address]uint64, symbol string) error { + // Return funds + unitPrices, err := cli.UnitPrices(ctx, false) + if err != nil { + return err + } + feePerTx, err := fees.MulSum(unitPrices, maxUnits) + if err != nil { + return err + } + utils.Outf("{{yellow}}returning funds to %s{{/}}\n", s.key.Address) + var returnedBalance uint64 + + webSocketClient, err := ws.NewWebSocketClient(s.uris[0], ws.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) // we write the max read + if err != nil { + return err + } + p := &pacer{ws: webSocketClient} + go p.Run(ctx, s.minTxsPerSecond) + // TODO: we sleep here because occasionally the pacer will hang. Potentially due to + // p.wait() closing the inflight channel before the tx is registered/sent. Debug more. + time.Sleep(3 * time.Second) + for i := 0; i < s.numAccounts; i++ { + // Determine if we should return funds + balance := funds[accounts[i].Address] + if feePerTx > balance { + continue + } + + // Send funds + returnAmt := balance - feePerTx + actions := sh.GetTransfer(s.key.Address, returnAmt, uniqueBytes()) + _, tx, err := cli.GenerateTransactionManual(parser, actions, factories[i], feePerTx) + if err != nil { + return err + } + if err := p.Add(tx); err != nil { + return err + } + returnedBalance += returnAmt + + if i%250 == 0 && i > 0 { + utils.Outf("{{yellow}}checked %d accounts for fund return{{/}}\n", i) + } + utils.Outf("{{yellow}}returning funds to %s:{{/}} %s %s\n", accounts[i].Address, utils.FormatBalance(returnAmt), symbol) + } + if err := p.Wait(); err != nil { + utils.Outf("{{orange}}failed to return funds:{{/}} %v\n", err) + return err + } + utils.Outf( + "{{yellow}}returned funds:{{/}} %s %s\n", + utils.FormatBalance(returnedBalance), + symbol, + ) + return nil +} diff --git a/throughput/utils.go b/throughput/utils.go new file mode 100644 index 0000000000..c1e072db2f --- /dev/null +++ b/throughput/utils.go @@ -0,0 +1,12 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package throughput + +import ( + "encoding/binary" +) + +func uniqueBytes() []byte { + return binary.BigEndian.AppendUint64(nil, uint64(sent.Add(1))) +}