Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[crypto] Batch verify proposer gossip #351

Merged
merged 8 commits into from
Aug 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions chain/auth_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,28 @@
package chain

import (
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/hypersdk/workers"
)

const authWorkerBacklog = 16_384

type AuthVM interface {
Logger() logging.Logger
GetAuthBatchVerifier(authTypeID uint8, cores int, count int) (AuthBatchVerifier, bool)
}

// Adding a signature to a verification batch
// may perform complex cryptographic operations. We should
// not block the caller when this happens and we should
// not require each batch package to re-implement this logic.
type AuthBatch struct {
vm VM
job *workers.Job
vm AuthVM
job workers.Job
bvs map[uint8]*authBatchWorker
}

func NewAuthBatch(vm VM, job *workers.Job, authTypes map[uint8]int) *AuthBatch {
func NewAuthBatch(vm AuthVM, job workers.Job, authTypes map[uint8]int) *AuthBatch {
bvs := map[uint8]*authBatchWorker{}
for t, count := range authTypes {
bv, ok := vm.GetAuthBatchVerifier(t, job.Workers(), count)
Expand Down Expand Up @@ -69,8 +75,8 @@ type authBatchObject struct {
}

type authBatchWorker struct {
vm VM
job *workers.Job
vm AuthVM
job workers.Job
bv AuthBatchVerifier
items chan *authBatchObject
done chan struct{}
Expand Down
13 changes: 8 additions & 5 deletions chain/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type StatelessBlock struct {
vm VM
state merkledb.TrieView

sigJob *workers.Job
sigJob workers.Job
}

func NewBlock(ectx *ExecutionContext, vm VM, parent snowman.Block, tmstp int64) *StatelessBlock {
Expand Down Expand Up @@ -159,6 +159,13 @@ func (b *StatelessBlock) populateTxs(ctx context.Context) error {
b.sigJob = job
batchVerifier := NewAuthBatch(b.vm, b.sigJob, b.authCounts)

// Make sure to always call [Done], otherwise we will block all future [Workers]
Copy link
Contributor Author

@patrick-ogrady patrick-ogrady Aug 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was likely a bug that has existed for a long time.

defer func() {
// BatchVerifier is given the responsibility to call [b.sigJob.Done()] because it may add things
// to the work queue async and that may not have completed by this point.
go batchVerifier.Done(func() { sigVerifySpan.End() })
}()

// Confirm no transaction duplicates and setup
// AWM processing
b.txsSet = set.NewSet[ids.ID](len(b.Txs))
Expand Down Expand Up @@ -200,10 +207,6 @@ func (b *StatelessBlock) populateTxs(ctx context.Context) error {
b.containsWarp = true
}
}

// BatchVerifier is given the responsibility to call [b.sigJob.Done()] because it may add things
// to the work queue async and that may not have completed by this point.
go batchVerifier.Done(func() { sigVerifySpan.End() })
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion chain/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Parser interface {
type VM interface {
Parser

Workers() *workers.Workers
Workers() workers.Workers
Tracer() trace.Tracer
Logger() logging.Logger

Expand Down
10 changes: 6 additions & 4 deletions chain/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,24 @@ func UnmarshalTxs(
initialCapacity int,
actionRegistry ActionRegistry,
authRegistry AuthRegistry,
) ([]*Transaction, error) {
) (map[uint8]int, []*Transaction, error) {
p := codec.NewReader(raw, consts.NetworkSizeLimit)
txCount := p.UnpackInt(true)
authCounts := map[uint8]int{}
txs := make([]*Transaction, 0, initialCapacity) // DoS to set size to txCount
for i := 0; i < txCount; i++ {
tx, err := UnmarshalTx(p, actionRegistry, authRegistry)
if err != nil {
return nil, err
return nil, nil, err
}
txs = append(txs, tx)
authCounts[tx.Auth.GetTypeID()]++
}
if !p.Empty() {
// Ensure no leftover bytes
return nil, ErrInvalidObject
return nil, nil, ErrInvalidObject
}
return txs, p.Err()
return authCounts, txs, p.Err()
}

func UnmarshalTx(
Expand Down
2 changes: 1 addition & 1 deletion examples/morpheusvm/tests/load/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ var _ = ginkgo.Describe("load tests vm", func() {
allTxs := map[ids.ID]struct{}{}
ginkgo.By("generate txs", func() {
start := time.Now()
w := workers.New(numWorkers, 10) // parallelize generation to speed things up
w := workers.NewParallel(numWorkers, 10) // parallelize generation to speed things up
j, err := w.NewJob(512)
gomega.Ω(err).Should(gomega.BeNil())
for i := 0; i < txs; i++ {
Expand Down
2 changes: 1 addition & 1 deletion examples/tokenvm/tests/load/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ var _ = ginkgo.Describe("load tests vm", func() {
allTxs := map[ids.ID]struct{}{}
ginkgo.By("generate txs", func() {
start := time.Now()
w := workers.New(numWorkers, 10) // parallelize generation to speed things up
w := workers.NewParallel(numWorkers, 10) // parallelize generation to speed things up
j, err := w.NewJob(512)
gomega.Ω(err).Should(gomega.BeNil())
for i := 0; i < txs; i++ {
Expand Down
1 change: 1 addition & 0 deletions gossiper/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ type VM interface {
Submit(ctx context.Context, verify bool, txs []*chain.Transaction) []error
RecordTxsGossiped(int)
RecordTxsReceived(int)
GetAuthBatchVerifier(authTypeID uint8, cores int, count int) (chain.AuthBatchVerifier, bool)
}
2 changes: 1 addition & 1 deletion gossiper/manual.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (g *Manual) ForceGossip(ctx context.Context) error {

func (g *Manual) HandleAppGossip(ctx context.Context, nodeID ids.NodeID, msg []byte) error {
actionRegistry, authRegistry := g.vm.Registry()
txs, err := chain.UnmarshalTxs(msg, initialCapacity, actionRegistry, authRegistry)
_, txs, err := chain.UnmarshalTxs(msg, initialCapacity, actionRegistry, authRegistry)
if err != nil {
g.vm.Logger().Warn(
"AppGossip provided invalid txs",
Expand Down
48 changes: 43 additions & 5 deletions gossiper/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ava-labs/avalanchego/vms/proposervm/proposer"
"github.com/ava-labs/hypersdk/chain"
"github.com/ava-labs/hypersdk/consts"
"github.com/ava-labs/hypersdk/workers"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -243,7 +244,7 @@ func (g *Proposer) ForceGossip(ctx context.Context) error {

func (g *Proposer) HandleAppGossip(ctx context.Context, nodeID ids.NodeID, msg []byte) error {
actionRegistry, authRegistry := g.vm.Registry()
txs, err := chain.UnmarshalTxs(msg, initialCapacity, actionRegistry, authRegistry)
authCounts, txs, err := chain.UnmarshalTxs(msg, initialCapacity, actionRegistry, authRegistry)
if err != nil {
g.vm.Logger().Warn(
"received invalid txs",
Expand Down Expand Up @@ -273,19 +274,56 @@ func (g *Proposer) HandleAppGossip(ctx context.Context, nodeID ids.NodeID, msg [
}
}

// Add incoming transactions to our caches to prevent useless gossip
// Add incoming transactions to our caches to prevent useless gossip and perform
// batch signature verification.
//
// We rely on AppGossipConcurrency to regulate concurrency here, so we don't create
// a separate pool of workers for this verification.
job, err := workers.NewSerial().NewJob(len(txs))
if err != nil {
g.vm.Logger().Warn(
"unable to spawn new worker",
zap.Stringer("peerID", nodeID),
zap.Error(err),
)
return nil
}
batchVerifier := chain.NewAuthBatch(g.vm, job, authCounts)
for _, tx := range txs {
// Verify signature async
txDigest, err := tx.Digest()
if err != nil {
g.vm.Logger().Warn(
"unable to compute tx digest",
zap.Stringer("peerID", nodeID),
zap.Error(err),
)
batchVerifier.Done(nil)
return nil
}
batchVerifier.Add(txDigest, tx.Auth)

// Track incoming txs
if c != nil {
c.Put(tx.ID(), struct{}{})
}
g.receivedTxs.Put(tx.ID(), struct{}{})
}
batchVerifier.Done(nil)

// Wait for signature verification to finish
if err := job.Wait(); err != nil {
g.vm.Logger().Warn(
"received invalid gossip",
zap.Stringer("peerID", nodeID),
zap.Error(err),
)
return nil
}

// Submit incoming gossip to mempool
//
// TODO: use batch verification and batched repeat check
start := time.Now()
for _, err := range g.vm.Submit(ctx, true, txs) {
for _, err := range g.vm.Submit(ctx, false, txs) {
if err == nil || errors.Is(err, chain.ErrDuplicateTx) {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion vm/resolutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (vm *VM) Registry() (chain.ActionRegistry, chain.AuthRegistry) {
return vm.actionRegistry, vm.authRegistry
}

func (vm *VM) Workers() *workers.Workers {
func (vm *VM) Workers() workers.Workers {
return vm.workers
}

Expand Down
47 changes: 26 additions & 21 deletions vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type VM struct {
webSocketServer *rpc.WebSocketServer

// Reuse gorotuine group to avoid constant re-allocation
workers *workers.Workers
workers workers.Workers

bootstrapped utils.Atomic[bool]
preferred ids.ID
Expand Down Expand Up @@ -212,7 +212,7 @@ func (vm *VM) Initialize(
}

// Setup worker cluster
vm.workers = workers.New(vm.config.GetParallelism(), 100)
vm.workers = workers.NewParallel(vm.config.GetParallelism(), 100)

// Init channels before initializing other structs
vm.toEngine = toEngine
Expand Down Expand Up @@ -683,11 +683,32 @@ func (vm *VM) Submit(
if err != nil {
return []error{err}
}

// Find repeats
oldestAllowed := now - r.GetValidityWindow()
repeats, err := blk.IsRepeat(ctx, oldestAllowed, txs, set.NewBits(), true)
if err != nil {
return []error{err}
}

validTxs := []*chain.Transaction{}
for _, tx := range txs {
for i, tx := range txs {
// Check if transaction is a repeat before doing any extra work
if repeats.Contains(i) {
errs = append(errs, chain.ErrDuplicateTx)
continue
}

// Avoid any sig verification or state lookup if we already have tx in mempool
txID := tx.ID()
// We already verify in streamer, let's avoid re-verification
if vm.mempool.Has(ctx, txID) {
// Don't remove from listeners, it will be removed elsewhere if not
// included
errs = append(errs, ErrNotAdded)
continue
}

// Verify signature if not already verified by caller
if verifySig && vm.config.GetVerifySignatures() {
sigVerify := tx.AuthAsyncVerify()
if err := sigVerify(); err != nil {
Expand All @@ -701,23 +722,7 @@ func (vm *VM) Submit(
continue
}
}
// Avoid any state lookup if we already have tx in mempool
if vm.mempool.Has(ctx, txID) {
// Don't remove from listeners, it will be removed elsewhere if not
// included
errs = append(errs, ErrNotAdded)
continue
}
// TODO: Batch this repeat check (and collect multiple txs at once)
repeat, err := blk.IsRepeat(ctx, oldestAllowed, []*chain.Transaction{tx}, set.NewBits(), true)
if err != nil {
errs = append(errs, err)
continue
}
if repeat.Len() > 0 {
errs = append(errs, chain.ErrDuplicateTx)
continue
}

// PreExecute does not make any changes to state
//
// This may fail if the state we are utilizing is invalidated (if a trie
Expand Down
Loading