Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[crypto/ed25519] ed25519 Concurrent Batch Verification + ZIP-215 Enforcement #338

Merged
merged 10 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions chain/auth_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (C) 2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package chain

import (
"github.com/ava-labs/hypersdk/workers"
)

const authWorkerBacklog = 16_384

// Adding a signature to a verification batch
// may perform complex cryptographic operations. We should
// not block the caller when this happens and we should
// not require each batch package to re-implement this logic.
type AuthBatch struct {
vm VM
job *workers.Job
bvs map[uint8]*authBatchWorker
}

func NewAuthBatch(vm VM, job *workers.Job, authTypes map[uint8]int) *AuthBatch {
bvs := map[uint8]*authBatchWorker{}
for t, count := range authTypes {
bv, ok := vm.GetAuthBatchVerifier(t, job.Workers(), count)
if !ok {
continue
}
bw := &authBatchWorker{
vm,
job,
bv,
make(chan *authBatchObject, authWorkerBacklog),
make(chan struct{}),
}
go bw.start()
bvs[t] = bw
}
return &AuthBatch{vm, job, bvs}
}

func (a *AuthBatch) Add(digest []byte, auth Auth) {
// If batch doesn't exist for auth, just add verify right to job and start
// processing.
bv, ok := a.bvs[auth.GetTypeID()]
if !ok {
a.job.Go(func() error { return auth.AsyncVerify(digest) })
return
}
bv.items <- &authBatchObject{digest, auth}
}

func (a *AuthBatch) Done(f func()) {
for _, bw := range a.bvs {
close(bw.items)
<-bw.done

for _, item := range bw.bv.Done() {
a.job.Go(item)
a.vm.Logger().Debug("enqueued batch for processing during done")
}
}
a.job.Done(f)
}

type authBatchObject struct {
digest []byte
auth Auth
}

type authBatchWorker struct {
vm VM
job *workers.Job
bv AuthBatchVerifier
items chan *authBatchObject
done chan struct{}
}

func (b *authBatchWorker) start() {
defer close(b.done)

for object := range b.items {
if j := b.bv.Add(object.digest, object.auth); j != nil {
// May finish parts of batch early, let's start computing them as soon as possible
b.job.Go(j)
b.vm.Logger().Debug("enqueued batch for processing during add")
}
}
}
33 changes: 26 additions & 7 deletions chain/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type StatefulBlock struct {
WarpResults set.Bits64 `json:"warpResults"`

size int

// authCounts can be used by batch signature verification
// to preallocate memory
authCounts map[uint8]int
}

func (b *StatefulBlock) Size() int {
Expand Down Expand Up @@ -147,23 +151,32 @@ func (b *StatelessBlock) populateTxs(ctx context.Context) error {
defer span.End()

// Setup signature verification job
_, sigVerifySpan := b.vm.Tracer().Start(ctx, "StatelessBlock.verifySignatures")
job, err := b.vm.Workers().NewJob(len(b.Txs))
if err != nil {
return err
}
b.sigJob = job
batchVerifier := NewAuthBatch(b.vm, b.sigJob, b.authCounts)

// Process transactions
_, sspan := b.vm.Tracer().Start(ctx, "StatelessBlock.verifySignatures")
// Confirm no transaction duplicates and setup
// AWM processing
b.txsSet = set.NewSet[ids.ID](len(b.Txs))
b.warpMessages = map[ids.ID]*warpJob{}
for _, tx := range b.Txs {
b.sigJob.Go(tx.AuthAsyncVerify())
// Ensure there are no duplicate transactions
if b.txsSet.Contains(tx.ID()) {
return ErrDuplicateTx
}
b.txsSet.Add(tx.ID())

// Verify signature async
txDigest, err := tx.Digest()
if err != nil {
return err
}
batchVerifier.Add(txDigest, tx.Auth)

// Check if we need the block context to verify the block (which contains
// an Avalanche Warp Message)
//
Expand All @@ -187,7 +200,10 @@ func (b *StatelessBlock) populateTxs(ctx context.Context) error {
b.containsWarp = true
}
}
b.sigJob.Done(func() { sspan.End() })

// BatchVerifier is given the responsibility to call [b.sigJob.Done()] because it may add things
// to the work queue async and that may not have completed by this point.
go batchVerifier.Done(func() { sigVerifySpan.End() })
return nil
}

Expand Down Expand Up @@ -453,10 +469,10 @@ func (b *StatelessBlock) innerVerify(ctx context.Context) (merkledb.TrieView, er
)
return nil, ErrMissingBlockContext
}
_, sspan := b.vm.Tracer().Start(ctx, "StatelessBlock.verifyWarpMessages")
_, warpVerifySpan := b.vm.Tracer().Start(ctx, "StatelessBlock.verifyWarpMessages")
b.vdrState = b.vm.ValidatorState()
go func() {
defer sspan.End()
defer warpVerifySpan.End()
// We don't use [b.vm.Workers] here because we need the warp verification
// results during normal execution. If we added a job to the workers queue,
// it would get executed after all signatures. Additionally, BLS
Expand Down Expand Up @@ -776,10 +792,12 @@ func (b *StatefulBlock) Marshal() ([]byte, error) {
p.PackWindow(b.UnitWindow)

p.PackInt(len(b.Txs))
b.authCounts = map[uint8]int{}
for _, tx := range b.Txs {
if err := tx.Marshal(p); err != nil {
return nil, err
}
b.authCounts[tx.Auth.GetTypeID()]++
}

p.PackID(b.StateRoot)
Expand Down Expand Up @@ -815,14 +833,15 @@ func UnmarshalBlock(raw []byte, parser Parser) (*StatefulBlock, error) {
txCount := p.UnpackInt(false) // can produce empty blocks
actionRegistry, authRegistry := parser.Registry()
b.Txs = []*Transaction{} // don't preallocate all to avoid DoS
b.authCounts = map[uint8]int{}
for i := 0; i < txCount; i++ {
tx, err := UnmarshalTx(p, actionRegistry, authRegistry)
if err != nil {
return nil, err
}
b.Txs = append(b.Txs, tx)
b.authCounts[tx.Auth.GetTypeID()]++
}

p.UnpackID(false, &b.StateRoot)
b.UnitsConsumed = p.UnpackUint64(false)
b.WarpResults = set.Bits64(p.UnpackUint64(false))
Expand Down
10 changes: 10 additions & 0 deletions chain/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type VM interface {
Tracer() trace.Tracer
Logger() logging.Logger

// We don't include this in registry because it would never be used
// by any client of the hypersdk.
GetAuthBatchVerifier(authTypeID uint8, cores int, count int) (AuthBatchVerifier, bool)

IsBootstrapped() bool
LastAcceptedBlock() *StatelessBlock
SetLastAccepted(*StatelessBlock) error
Expand Down Expand Up @@ -157,8 +161,14 @@ type Action interface {
Marshal(p *codec.Packer)
}

type AuthBatchVerifier interface {
Add([]byte, Auth) func() error
Done() []func() error
}

type Auth interface {
GetTypeID() uint8 // identify uniquely the auth

MaxUnits(Rules) uint64
ValidRange(Rules) (start int64, end int64) // -1 means no start/end

Expand Down
6 changes: 3 additions & 3 deletions cli/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
package cli

import (
"github.com/ava-labs/hypersdk/crypto"
"github.com/ava-labs/hypersdk/crypto/ed25519"
)

type Controller interface {
DatabasePath() string
Symbol() string
Address(crypto.PublicKey) string
ParseAddress(string) (crypto.PublicKey, error)
Address(ed25519.PublicKey) string
ParseAddress(string) (ed25519.PublicKey, error)
}
8 changes: 4 additions & 4 deletions cli/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
"context"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/hypersdk/crypto"
"github.com/ava-labs/hypersdk/crypto/ed25519"
"github.com/ava-labs/hypersdk/rpc"
"github.com/ava-labs/hypersdk/utils"
)

func (h *Handler) GenerateKey() error {
// TODO: encrypt key
priv, err := crypto.GeneratePrivateKey()
priv, err := ed25519.GeneratePrivateKey()
if err != nil {
return err
}
Expand All @@ -33,7 +33,7 @@ func (h *Handler) GenerateKey() error {
}

func (h *Handler) ImportKey(keyPath string) error {
priv, err := crypto.LoadKey(keyPath)
priv, err := ed25519.LoadKey(keyPath)
if err != nil {
return err
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func (h *Handler) SetKey(lookupBalance func(int, string, string, uint32, ids.ID)
return h.StoreDefaultKey(key.PublicKey())
}

func (h *Handler) Balance(checkAllChains bool, promptAsset bool, printBalance func(crypto.PublicKey, string, uint32, ids.ID, ids.ID) error) error {
func (h *Handler) Balance(checkAllChains bool, promptAsset bool, printBalance func(ed25519.PublicKey, string, uint32, ids.ID, ids.ID) error) error {
priv, err := h.GetDefaultKey()
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions cli/prompt.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/hypersdk/crypto"
"github.com/ava-labs/hypersdk/crypto/ed25519"
"github.com/ava-labs/hypersdk/utils"
"github.com/manifoldco/promptui"
)

func (h *Handler) PromptAddress(label string) (crypto.PublicKey, error) {
func (h *Handler) PromptAddress(label string) (ed25519.PublicKey, error) {
promptText := promptui.Prompt{
Label: label,
Validate: func(input string) error {
Expand All @@ -28,7 +28,7 @@ func (h *Handler) PromptAddress(label string) (crypto.PublicKey, error) {
}
recipient, err := promptText.Run()
if err != nil {
return crypto.EmptyPublicKey, err
return ed25519.EmptyPublicKey, err
}
recipient = strings.TrimSpace(recipient)
return h.c.ParseAddress(recipient)
Expand Down
22 changes: 11 additions & 11 deletions cli/spam.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/hypersdk/chain"
"github.com/ava-labs/hypersdk/consts"
"github.com/ava-labs/hypersdk/crypto"
"github.com/ava-labs/hypersdk/crypto/ed25519"
"github.com/ava-labs/hypersdk/pubsub"
"github.com/ava-labs/hypersdk/rpc"
"github.com/ava-labs/hypersdk/utils"
Expand Down Expand Up @@ -50,11 +50,11 @@ var (
func (h *Handler) Spam(
maxTxBacklog int, randomRecipient bool,
createClient func(string, uint32, ids.ID), // must save on caller side
getFactory func(crypto.PrivateKey) chain.AuthFactory,
getFactory func(ed25519.PrivateKey) chain.AuthFactory,
lookupBalance func(int, string) (uint64, error),
getParser func(context.Context, ids.ID) (chain.Parser, error),
getTransfer func(crypto.PublicKey, uint64) chain.Action,
submitDummy func(*rpc.JSONRPCClient, crypto.PrivateKey) func(context.Context, uint64) error,
getTransfer func(ed25519.PublicKey, uint64) chain.Action,
submitDummy func(*rpc.JSONRPCClient, ed25519.PrivateKey) func(context.Context, uint64) error,
) error {
ctx := context.Background()

Expand Down Expand Up @@ -121,20 +121,20 @@ func (h *Handler) Spam(
h.ValueString(ids.Empty, distAmount),
h.AssetString(ids.Empty),
)
accounts := make([]crypto.PrivateKey, numAccounts)
accounts := make([]ed25519.PrivateKey, numAccounts)
dcli, err := rpc.NewWebSocketClient(uris[0], rpc.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) // we write the max read
if err != nil {
return err
}
funds := map[crypto.PublicKey]uint64{}
funds := map[ed25519.PublicKey]uint64{}
parser, err := getParser(ctx, chainID)
if err != nil {
return err
}
var fundsL sync.Mutex
for i := 0; i < numAccounts; i++ {
// Create account
pk, err := crypto.GeneratePrivateKey()
pk, err := ed25519.GeneratePrivateKey()
if err != nil {
return err
}
Expand Down Expand Up @@ -259,7 +259,7 @@ func (h *Handler) Spam(

// Send transaction
start := time.Now()
selected := map[crypto.PublicKey]int{}
selected := map[ed25519.PublicKey]int{}
for k := 0; k < numTxsPerAccount; k++ {
recipient, err := getNextRecipient(randomRecipient, i, accounts)
if err != nil {
Expand Down Expand Up @@ -463,11 +463,11 @@ func startIssuer(cctx context.Context, issuer *txIssuer) {
}()
}

func getNextRecipient(randomRecipient bool, self int, keys []crypto.PrivateKey) (crypto.PublicKey, error) {
func getNextRecipient(randomRecipient bool, self int, keys []ed25519.PrivateKey) (ed25519.PublicKey, error) {
if randomRecipient {
priv, err := crypto.GeneratePrivateKey()
priv, err := ed25519.GeneratePrivateKey()
if err != nil {
return crypto.EmptyPublicKey, err
return ed25519.EmptyPublicKey, err
}
return priv.PublicKey(), nil
}
Expand Down
Loading